Истек срок действия сообщений в журнале Spring-boot activemq - PullRequest
0 голосов
/ 12 марта 2020

У меня простое приложение с весенней загрузкой, и я хочу записать сообщение с истекшим сроком действия; после прочтения большого количества билетов, прочтения множества учебных пособий, это не сработало.

это мое приложение SpringBoot

@SpringBootApplication
public class Run implements ApplicationRunner {
    private static Logger   log = LoggerFactory.getLogger(Run.class);
    @Autowired
    private OrderSender     orderSender;

    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        log.info("Spring Boot Embedded ActiveMQ Configuration Example");
        for (int i = 0; i < 5; i++) {
            Order myMessage = new Order(i + " - Sending JMS Message using Embedded activeMQ", new Date());
            orderSender.send(myMessage);
            TimeUnit.MILLISECONDS.sleep(500);
        }
        log.info("Waiting for all ActiveMQ JMS Messages to be consumed");
        int ttl = 10;
        TimeUnit.SECONDS.sleep(ttl);
        log.info(ttl + " seconds elapsed," + ttl + " more seconds to see DLQ log");
        TimeUnit.SECONDS.sleep(ttl);
        System.exit(-1);
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Run.class, args);
    }
}

это мой MessageSender

@Service
public class OrderSender {
    //
    private static Logger   log = LoggerFactory.getLogger(OrderSender.class);
    @Autowired
    private JmsTemplate     jmsTemplate;

    public void send(Order myMessage) {
        log.info("sending with convertAndSend() to queue <" + myMessage + ">");
        jmsTemplate.convertAndSend(QUEUE, myMessage);
    }
}

это мой потребитель очереди [я пытался написать их отдельно без успеха]

@Component
public class OrderConsumer {
    private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    @JmsListener(destination = QUEUE)
    public void receiveMessage(Order order) throws InterruptedException {
        String logPrefix = UUID.randomUUID().toString() + " - ";
        log.info(logPrefix + "received <" + order + ">");
        log.debug(logPrefix + "- - - - - - - - SLEEP - - - - - - - - - - - - -");
        TimeUnit.SECONDS.sleep(8);
        log.debug(logPrefix + "- - - - - - - - END SLEEP - - - - - - - - - - -");
    }

    @JmsListener(destination = QUEUE_DLQ)
    public void receiveMessageDlq(Order order) throws InterruptedException {
        String logPrefix = UUID.randomUUID().toString() + " - ";
        log.info(logPrefix + "DLQ <" + order + ">");
    }
}

это мой компонент конфигурации

@EnableJms
@Configuration
public class ActiveMQConfig {
    //
    public static final String  QUEUE       = "orderqueue";
    public static final String  SUFFIX      = ".dlq";
    public static final String  QUEUE_DLQ   = QUEUE + SUFFIX;

    @Bean
    public BrokerService broker(@Autowired DeadLetterStrategy strategy) throws Exception {
        BrokerService broker = new BrokerService();
        broker.addConnector("vm://embedded?broker.persistent=false,useShutdownHook=false");
        //
        PolicyEntry entry = new PolicyEntry();
        entry.setDestination(new ActiveMQQueue("*"));
        // entry.setDestination(new ActiveMQQueue(">"));
        entry.setDeadLetterStrategy(strategy);
        //
        PolicyMap map = new PolicyMap();
        map.setPolicyEntries(Arrays.asList(entry));
        broker.setDestinationPolicy(map);
        //
        return broker;
    }

    @Bean
    public DeadLetterStrategy deadLetterStrategy() {
        IndividualDeadLetterStrategy ids = new IndividualDeadLetterStrategy(); //
        ids.setQueueSuffix(SUFFIX);
        ids.setUseQueueForQueueMessages(true);
        return ids;
    }

    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setMessageConverter(messageConverter());
        return factory;
    }

    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    public Queue myQueue() {
        ActiveMQQueue queue = new ActiveMQQueue(QUEUE);
        return queue;
    }
}

это в моем application.properties

spring.jms.listener.max-concurrency=1
spring.jms.template.time-to-live=5000

это у меня в поме. xml [часть]

<properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-kahadb-store</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

и вот мои журналы

Starting Run on ...
No active profile set, falling back to default profiles: default
Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@4516af24: startup date [Thu Mar 12 14:46:40 CET 2020]; root of context hierarchy
Using Persistence Adapter: KahaDBPersistenceAdapter[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\KahaDB]
JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
KahaDB is version 6
Recovering from the journal @1:104464
Recovery replayed 4 operations from the journal in 0.02 seconds.
PListStore:[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\tmp_storage] started
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is starting
Connector vm://embedded?broker.persistent=false,useShutdownHook=false started
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) started
For help or more information please see: http://activemq.apache.org
Registering beans for JMX exposure on startup
Starting beans in phase 2147483647
Connector vm://localhost started
Spring Boot Embedded ActiveMQ Configuration Example
>> sending with convertAndSend() to queue <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:41 CET 2020}>
acbc670e-de0c-49ac-96ef-763675920069 - received <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:41 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='1 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:42 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='2 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:43 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='3 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:43 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='4 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:44 CET 2020}>
Waiting for all ActiveMQ JMS Messages to be consumed
10 seconds elapsed,10 more seconds to see DLQ log
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is shutting down
Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@4516af24: startup date [Thu Mar 12 14:46:40 CET 2020]; root of context hierarchy
Connector vm://embedded?broker.persistent=false,useShutdownHook=false stopped
Stopping beans in phase 2147483647
Connector vm://localhost stopped
Setup of JMS message listener invoker failed for destination 'orderqueue.dlq' - trying to recover. Cause: peer (vm://localhost#1) stopped.
PListStore:[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\tmp_storage] stopped
Stopping async queue tasks
Stopping async topic tasks
Setup of JMS message listener invoker failed for destination 'orderqueue' - trying to recover. Cause: peer (vm://localhost#3) stopped.
Stopped KahaDB
Unregistering JMX-exposed beans on shutdown
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) uptime 23.525 seconds
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is shutdown

, как вы видите, я отправляю 5 сообщений , обработал только 1 и никто в DLQ ... Я потратил много времени здесь, на стеке, не найдя решения. Пожалуйста, кто-нибудь, помогите мне!

...