Сообщение не восстановлено в DefaultMessageListenerContainer - PullRequest
0 голосов
/ 14 января 2020

У меня есть фабричный класс, который я использую для создания потребителя mq и bean-компонента DefaultMessageListenerContainer от этого потребителя, который будет использоваться для получения сообщений от topi c. Это выглядит так -

public class MQMessageFactory {

    public static DemoMessageConsumer createMessageConumer(Map<String, String> queueDetails, Map<String, String> sslDetails) throws Exception {
        MQConnectionFactory mqConnectionFactory = createMQConnectionFactory(queueDetails, sslDetails);
        return new DemoMessageConsumer(queueDetails, mqConnectionFactory);
    }

    public static MQConnectionFactory createMQConnectionFactory(Map<String, String> queue, Map<String, String> sslDetails) throws Exception {
        MQConnectionFactory cf = new MQConnectionFactory();
        try {
            cf.setHostName(queue.get("hostname"));
            cf.setPort(queue.get("port"));
            cf.setQueueManager(queue.get("queueManager"));
            cf.setChannel(queue.get("channel"));
            cf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            cf.setStringProperty(WMQConstants.USERID, queue.get("username"));
            cf.setStringProperty(WMQConstants.PASSWORD, queue.get("password"));
            cf.setSSLCipherSuite(queue.get("sslCipherSuite"));
            cf.setSSLSocketFactory(someMethodToCreateSSLContextFactory(sslDetails));
            return cf;
        } catch (JMSException e) {
            throw new RuntimeException("Unable to establish connection with host: " + queue.get("hostname"), e);
        }
    }
}
public class DemoMessageConsumer implements SessionAwareMessageListener {
    private static final Logger LOGGER = LogManager.getLogger(DemoMessageConsumer.class);

    private SingleConnectionFactory singleCf;
    private Map<String, String> properties;
    private DefaultMessageListenerContainer container;
    private Consumer<Message> messageProcessor;

    public DemoMessageConsumer(Map<String, String> properties, MQConnectionFactory connectionFactory) {
        this.singleCf = new SingleConnectionFactory(connectionFactory);
        this.singleCf.setReconnectOnException(true);
        this.singleCf.afterPropertiesSet();
        this.properties = properties;
    }

    public DefaultMessageListenerContainer listen(String queueName, Executor taskExecutor, Consumer<Message> messageProcessor) {
        this.messageProcessor = messageProcessor;
        this.container = new DefaultMessageListenerContainer();
        this.container.setConnectionFactory(singleCf);
        this.container.setDestinationName(queueName);
        // this.container.setAcceptMessagesWhileStopping(true);
        this.container.setSessionTransacted(true);
        this.container.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        this.container.setMessageListener(this);
        this.container.setConcurrentConsumers(5);
        this.container.setTaskExecutor(taskExecutor);
        this.container.afterPropertiesSet();
        this.container.start();
        LOGGER.info("Consumer started");
        return this.container;
    }

    @Override
    public void onMessage(Message message, Session session) {
        try {
            LOGGER.info("Message received with MessageID: {}", message.getJMSMessageID());
            this.messageProcessor.accept(message);
        } catch (JMSException e) {
            LOGGER.error("Error while processing the message", e);
        }
    }

    public void triggerShutdown() {
        LOGGER.info("Shutdown called");
        this.container.stop();
        while (this.container.isRunning()) ;
        this.container.shutdown();
        while (this.container.isActive()) ;
        this.singleCf.destroy();
        LOGGER.info("Listener is shutdown");
    }
}

Далее, у меня есть класс загрузочного проекта Spring, где я на самом деле создаю компоненты и использую их для прослушивания очереди -

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "ibmmq")
public class MqConsumerImpl implements ApplicationListener<ContextClosedEvent> {
    private static final Logger LOGGER = LogManager.getLogger(MqConsumerImpl.class);

    public Map<String, String> ssl;
    public Map<String, String> queue;

    @Lazy
    @Autowired
    @Qualifier("mqConsumer")
    private DemoMessageConsumer consumer;

    @Bean("mqConsumer")
    public DemoMessageConsumer createConsumer() throws Exception {
        return MQMessageFactory.createMessageConumer(queue, ssl);
    }

    @Bean("mqListener")
    public DefaultMessageListenerContainer listen() {
        return this.consumer.listen(queue.get("name"), Executors.newFixedThreadPool(3), message -> {
            try {
                LOGGER.info("{} Message reading started: {} ", Thread.currentThread().getName(), message.getBody(String.class));
                // My business logic goes here
                Thread.sleep(1000);
                LOGGER.info("{} Message reading completed: {} ", Thread.currentThread().getName(), message.getBody(String.class));
            } catch (Exception e) {
                LOGGER.error(e);
            }
        });
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent e) {
        this.consumer.triggerShutdown();
    }
}

Теперь я запускаю приложение, и сообщения правильно обрабатываются, и все идет хорошо. У меня есть TaskExecutor из трех потоков, и все они используются для потребления сообщений и выполнения задач.

Затем я запускаю команду, чтобы остановить приложение и один или все потоки, в которых выполнялась бизнес-задача, МОЖЕТ / НЕ МОЖЕТ выдать это предупреждение -

2020-01-14 16:29:15.110  WARN 68468 --- [pool-2-thread-3] o.s.j.l.DefaultMessageListenerContainer  : Rejecting received message because of the listener container having been stopped in the meantime: 
  JMSMessage class: jms_text
  JMSType:          null
  JMSDeliveryMode:  2
  JMSDeliveryDelay: 0
  JMSDeliveryTime:  0
  JMSExpiration:    0
  JMSPriority:      4
  JMSMessageID:     ID:414d5120484b49473033533120202020a2e4f05dce410b24
  JMSTimestamp:     1578982170556
  JMSCorrelationID: null
  JMSDestination:   queue:///QUEUE.NAME
  JMSReplyTo:       null
  JMSRedelivered:   false
    JMSXAppID: demo.Application          
    JMSXDeliveryCount: 1
    JMSXUserID: username     
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 273
    JMS_IBM_Format: MQSTR   
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 28
    JMS_IBM_PutDate: 20200114
    JMS_IBM_PutTime: 06093062
hello everyone new 1578982170555
2020-01-14 16:29:15.129  INFO 68468 --- [pool-2-thread-2] c.s.l.i.c.MqConsumerImpl       : pool-2-thread-2 Message reading completed: hello everyone new 1578982170344 
2020-01-14 16:29:16.180  INFO 68468 --- [       Thread-9] c.s.l.n.c.MqConsumerImpl       : Listener for queue: XXXXXXXXX is shutdown

Process finished with exit code 1

Теперь, оно совершенно нормально для меня, чтобы увидеть это. Согласно весенним загрузочным классам jms, это происходит, и в этом случае должен вызываться откат сеанса, чтобы при перезапуске моего потребителя сообщение доставлялось. ЭТОГО НЕ ПРОИСХОДИТ. Я не получаю сообщение - hello everyone new 1578982170555 в следующий раз, когда я начал потребителя, и я получил следующее. Следовательно, сообщение теряется без обработки. Как я могу защитить это? Примечание. Как видно из журналов, при возникновении этого предупреждения метод onMessage () для этого метода не вызывался.

...