У меня простое приложение с весенней загрузкой, и я хочу записать сообщение с истекшим сроком действия; после прочтения большого количества билетов, прочтения множества учебных пособий, это не сработало.
это мое приложение SpringBoot
@SpringBootApplication
public class Run implements ApplicationRunner {
private static Logger log = LoggerFactory.getLogger(Run.class);
@Autowired
private OrderSender orderSender;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
log.info("Spring Boot Embedded ActiveMQ Configuration Example");
for (int i = 0; i < 5; i++) {
Order myMessage = new Order(i + " - Sending JMS Message using Embedded activeMQ", new Date());
orderSender.send(myMessage);
TimeUnit.MILLISECONDS.sleep(500);
}
log.info("Waiting for all ActiveMQ JMS Messages to be consumed");
int ttl = 10;
TimeUnit.SECONDS.sleep(ttl);
log.info(ttl + " seconds elapsed," + ttl + " more seconds to see DLQ log");
TimeUnit.SECONDS.sleep(ttl);
System.exit(-1);
}
public static void main(String[] args) throws Exception {
SpringApplication.run(Run.class, args);
}
}
это мой MessageSender
@Service
public class OrderSender {
//
private static Logger log = LoggerFactory.getLogger(OrderSender.class);
@Autowired
private JmsTemplate jmsTemplate;
public void send(Order myMessage) {
log.info("sending with convertAndSend() to queue <" + myMessage + ">");
jmsTemplate.convertAndSend(QUEUE, myMessage);
}
}
это мой потребитель очереди [я пытался написать их отдельно без успеха]
@Component
public class OrderConsumer {
private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);
@JmsListener(destination = QUEUE)
public void receiveMessage(Order order) throws InterruptedException {
String logPrefix = UUID.randomUUID().toString() + " - ";
log.info(logPrefix + "received <" + order + ">");
log.debug(logPrefix + "- - - - - - - - SLEEP - - - - - - - - - - - - -");
TimeUnit.SECONDS.sleep(8);
log.debug(logPrefix + "- - - - - - - - END SLEEP - - - - - - - - - - -");
}
@JmsListener(destination = QUEUE_DLQ)
public void receiveMessageDlq(Order order) throws InterruptedException {
String logPrefix = UUID.randomUUID().toString() + " - ";
log.info(logPrefix + "DLQ <" + order + ">");
}
}
это мой компонент конфигурации
@EnableJms
@Configuration
public class ActiveMQConfig {
//
public static final String QUEUE = "orderqueue";
public static final String SUFFIX = ".dlq";
public static final String QUEUE_DLQ = QUEUE + SUFFIX;
@Bean
public BrokerService broker(@Autowired DeadLetterStrategy strategy) throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector("vm://embedded?broker.persistent=false,useShutdownHook=false");
//
PolicyEntry entry = new PolicyEntry();
entry.setDestination(new ActiveMQQueue("*"));
// entry.setDestination(new ActiveMQQueue(">"));
entry.setDeadLetterStrategy(strategy);
//
PolicyMap map = new PolicyMap();
map.setPolicyEntries(Arrays.asList(entry));
broker.setDestinationPolicy(map);
//
return broker;
}
@Bean
public DeadLetterStrategy deadLetterStrategy() {
IndividualDeadLetterStrategy ids = new IndividualDeadLetterStrategy(); //
ids.setQueueSuffix(SUFFIX);
ids.setUseQueueForQueueMessages(true);
return ids;
}
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setMessageConverter(messageConverter());
return factory;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public Queue myQueue() {
ActiveMQQueue queue = new ActiveMQQueue(QUEUE);
return queue;
}
}
это в моем application.properties
spring.jms.listener.max-concurrency=1
spring.jms.template.time-to-live=5000
это у меня в поме. xml [часть]
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.8.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
и вот мои журналы
Starting Run on ...
No active profile set, falling back to default profiles: default
Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@4516af24: startup date [Thu Mar 12 14:46:40 CET 2020]; root of context hierarchy
Using Persistence Adapter: KahaDBPersistenceAdapter[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\KahaDB]
JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
KahaDB is version 6
Recovering from the journal @1:104464
Recovery replayed 4 operations from the journal in 0.02 seconds.
PListStore:[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\tmp_storage] started
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is starting
Connector vm://embedded?broker.persistent=false,useShutdownHook=false started
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) started
For help or more information please see: http://activemq.apache.org
Registering beans for JMX exposure on startup
Starting beans in phase 2147483647
Connector vm://localhost started
Spring Boot Embedded ActiveMQ Configuration Example
>> sending with convertAndSend() to queue <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:41 CET 2020}>
acbc670e-de0c-49ac-96ef-763675920069 - received <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:41 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='1 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:42 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='2 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:43 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='3 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:43 CET 2020}>
>> sending with convertAndSend() to queue <Order{content='4 - Sending JMS Message using Embedded activeMQ', timestamp=Thu Mar 12 14:46:44 CET 2020}>
Waiting for all ActiveMQ JMS Messages to be consumed
10 seconds elapsed,10 more seconds to see DLQ log
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is shutting down
Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@4516af24: startup date [Thu Mar 12 14:46:40 CET 2020]; root of context hierarchy
Connector vm://embedded?broker.persistent=false,useShutdownHook=false stopped
Stopping beans in phase 2147483647
Connector vm://localhost stopped
Setup of JMS message listener invoker failed for destination 'orderqueue.dlq' - trying to recover. Cause: peer (vm://localhost#1) stopped.
PListStore:[C:\developements\customers\m2sc\workspace\spring-embedded-activemq\activemq-data\localhost\tmp_storage] stopped
Stopping async queue tasks
Stopping async topic tasks
Setup of JMS message listener invoker failed for destination 'orderqueue' - trying to recover. Cause: peer (vm://localhost#3) stopped.
Stopped KahaDB
Unregistering JMX-exposed beans on shutdown
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) uptime 23.525 seconds
Apache ActiveMQ 5.14.5 (localhost, ID:INT-PWTOW02-56695-1584020801379-0:1) is shutdown
, как вы видите, я отправляю 5 сообщений , обработал только 1 и никто в DLQ ... Я потратил много времени здесь, на стеке, не найдя решения. Пожалуйста, кто-нибудь, помогите мне!