Приложение My Spring использует очередь ActiveMQ.Возможны два подхода.Начальная часть интеграции ActiveMQ одинакова для обоих подходов:
@Bean
public ConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}
@Bean
public Queue notificationQueue() {
return resolveAvcQueueByJNDIName("java:comp/env/jms/name.not.important.queue");
}
Подход с одним потоком:
@Bean
public IntegrationFlow orderNotify() {
return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory()).destination(notificationQueue()),
c -> c.poller(Pollers.fixedDelay(QUEUE_POLLING_INTERVAL_MS)
.errorHandler(e -> logger.error("Can't handle incoming message", e))))
.handle(...).get();
}
Но я хочу использовать сообщения, используя несколько рабочих потоков, поэтому я произвел рефакторингкод от входящего адаптера к адаптеру канала, управляемого сообщениями:
@Bean
public IntegrationFlow orderNotify() {
return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory()).configureListenerContainer(c -> {
final DefaultMessageListenerContainer container = c.get();
container.setMaxConcurrentConsumers(notifyThreadPoolSize);
}).destination(notificationQueue()))
.handle(...).get();
}
Проблема заключается в том, что приложение не останавливает потребителя ActiveMQ при его повторном развертывании в Tomcat или при перезапуске для второго подхода.Это создает нового потребителя во время его запуска.Но все новые сообщения направляются старому «мертвому» потребителю, поэтому они размещаются в разделе «Ожидающие сообщения» и никогда не удаляются.
В чем здесь проблема?