У меня есть фабричный класс, который я использую для создания потребителя mq и bean-компонента DefaultMessageListenerContainer от этого потребителя, который будет использоваться для получения сообщений от topi c. Это выглядит так -
public class MQMessageFactory {
public static DemoMessageConsumer createMessageConumer(Map<String, String> queueDetails, Map<String, String> sslDetails) throws Exception {
MQConnectionFactory mqConnectionFactory = createMQConnectionFactory(queueDetails, sslDetails);
return new DemoMessageConsumer(queueDetails, mqConnectionFactory);
}
public static MQConnectionFactory createMQConnectionFactory(Map<String, String> queue, Map<String, String> sslDetails) throws Exception {
MQConnectionFactory cf = new MQConnectionFactory();
try {
cf.setHostName(queue.get("hostname"));
cf.setPort(queue.get("port"));
cf.setQueueManager(queue.get("queueManager"));
cf.setChannel(queue.get("channel"));
cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
cf.setStringProperty(WMQConstants.USERID, queue.get("username"));
cf.setStringProperty(WMQConstants.PASSWORD, queue.get("password"));
cf.setSSLCipherSuite(queue.get("sslCipherSuite"));
cf.setSSLSocketFactory(someMethodToCreateSSLContextFactory(sslDetails));
return cf;
} catch (JMSException e) {
throw new RuntimeException("Unable to establish connection with host: " + queue.get("hostname"), e);
}
}
}
public class DemoMessageConsumer implements SessionAwareMessageListener {
private static final Logger LOGGER = LogManager.getLogger(DemoMessageConsumer.class);
private SingleConnectionFactory singleCf;
private Map<String, String> properties;
private DefaultMessageListenerContainer container;
private Consumer<Message> messageProcessor;
public DemoMessageConsumer(Map<String, String> properties, MQConnectionFactory connectionFactory) {
this.singleCf = new SingleConnectionFactory(connectionFactory);
this.singleCf.setReconnectOnException(true);
this.singleCf.afterPropertiesSet();
this.properties = properties;
}
public DefaultMessageListenerContainer listen(String queueName, Executor taskExecutor, Consumer<Message> messageProcessor) {
this.messageProcessor = messageProcessor;
this.container = new DefaultMessageListenerContainer();
this.container.setConnectionFactory(singleCf);
this.container.setDestinationName(queueName);
// this.container.setAcceptMessagesWhileStopping(true);
this.container.setSessionTransacted(true);
this.container.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
this.container.setMessageListener(this);
this.container.setConcurrentConsumers(5);
this.container.setTaskExecutor(taskExecutor);
this.container.afterPropertiesSet();
this.container.start();
LOGGER.info("Consumer started");
return this.container;
}
@Override
public void onMessage(Message message, Session session) {
try {
LOGGER.info("Message received with MessageID: {}", message.getJMSMessageID());
this.messageProcessor.accept(message);
} catch (JMSException e) {
LOGGER.error("Error while processing the message", e);
}
}
public void triggerShutdown() {
LOGGER.info("Shutdown called");
this.container.stop();
while (this.container.isRunning()) ;
this.container.shutdown();
while (this.container.isActive()) ;
this.singleCf.destroy();
LOGGER.info("Listener is shutdown");
}
}
Далее, у меня есть класс загрузочного проекта Spring, где я на самом деле создаю компоненты и использую их для прослушивания очереди -
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "ibmmq")
public class MqConsumerImpl implements ApplicationListener<ContextClosedEvent> {
private static final Logger LOGGER = LogManager.getLogger(MqConsumerImpl.class);
public Map<String, String> ssl;
public Map<String, String> queue;
@Lazy
@Autowired
@Qualifier("mqConsumer")
private DemoMessageConsumer consumer;
@Bean("mqConsumer")
public DemoMessageConsumer createConsumer() throws Exception {
return MQMessageFactory.createMessageConumer(queue, ssl);
}
@Bean("mqListener")
public DefaultMessageListenerContainer listen() {
return this.consumer.listen(queue.get("name"), Executors.newFixedThreadPool(3), message -> {
try {
LOGGER.info("{} Message reading started: {} ", Thread.currentThread().getName(), message.getBody(String.class));
// My business logic goes here
Thread.sleep(1000);
LOGGER.info("{} Message reading completed: {} ", Thread.currentThread().getName(), message.getBody(String.class));
} catch (Exception e) {
LOGGER.error(e);
}
});
}
@Override
public void onApplicationEvent(ContextClosedEvent e) {
this.consumer.triggerShutdown();
}
}
Теперь я запускаю приложение, и сообщения правильно обрабатываются, и все идет хорошо. У меня есть TaskExecutor из трех потоков, и все они используются для потребления сообщений и выполнения задач.
Затем я запускаю команду, чтобы остановить приложение и один или все потоки, в которых выполнялась бизнес-задача, МОЖЕТ / НЕ МОЖЕТ выдать это предупреждение -
2020-01-14 16:29:15.110 WARN 68468 --- [pool-2-thread-3] o.s.j.l.DefaultMessageListenerContainer : Rejecting received message because of the listener container having been stopped in the meantime:
JMSMessage class: jms_text
JMSType: null
JMSDeliveryMode: 2
JMSDeliveryDelay: 0
JMSDeliveryTime: 0
JMSExpiration: 0
JMSPriority: 4
JMSMessageID: ID:414d5120484b49473033533120202020a2e4f05dce410b24
JMSTimestamp: 1578982170556
JMSCorrelationID: null
JMSDestination: queue:///QUEUE.NAME
JMSReplyTo: null
JMSRedelivered: false
JMSXAppID: demo.Application
JMSXDeliveryCount: 1
JMSXUserID: username
JMS_IBM_Character_Set: UTF-8
JMS_IBM_Encoding: 273
JMS_IBM_Format: MQSTR
JMS_IBM_MsgType: 8
JMS_IBM_PutApplType: 28
JMS_IBM_PutDate: 20200114
JMS_IBM_PutTime: 06093062
hello everyone new 1578982170555
2020-01-14 16:29:15.129 INFO 68468 --- [pool-2-thread-2] c.s.l.i.c.MqConsumerImpl : pool-2-thread-2 Message reading completed: hello everyone new 1578982170344
2020-01-14 16:29:16.180 INFO 68468 --- [ Thread-9] c.s.l.n.c.MqConsumerImpl : Listener for queue: XXXXXXXXX is shutdown
Process finished with exit code 1
Теперь, оно совершенно нормально для меня, чтобы увидеть это. Согласно весенним загрузочным классам jms, это происходит, и в этом случае должен вызываться откат сеанса, чтобы при перезапуске моего потребителя сообщение доставлялось. ЭТОГО НЕ ПРОИСХОДИТ. Я не получаю сообщение - hello everyone new 1578982170555
в следующий раз, когда я начал потребителя, и я получил следующее. Следовательно, сообщение теряется без обработки. Как я могу защитить это? Примечание. Как видно из журналов, при возникновении этого предупреждения метод onMessage () для этого метода не вызывался.