Как вручную подтвердить сообщение от ActiveMQ, используя Spring JmsListener - PullRequest
0 голосов
/ 05 февраля 2020

Я использую ActiveMQ (с JMS) вместе с Spring JmsListener. Я могу получать сообщения из очереди ActiveMQ, но она использует AUTO_ACKNOWLEDGE. Как я могу установить CLIENT_ACKNOWLEDGE, чтобы я мог использовать другое сообщение только после этого подтверждения.

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setTransactedIndividualAck(true);
    activeMQConnectionFactory.setUserName(mqUserName);
    activeMQConnectionFactory.setPassword(mqPassword);
    activeMQConnectionFactory.setBrokerURL(mqUrl);
    return activeMQConnectionFactory;
}

@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> {
        logger.info("An error has occurred in the transaction");
        logger.error(t.getCause().getMessage());
    });

    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("4");

    // You could still override some of Boot's default if necessary.
    return factory;
}

@Bean
public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
}

@JmsListener(destination = "QUEUE_1", containerFactory = "myFactory", concurrency = "2")
public void receiveImgGenerationMessage(String transaction) {
    logger.info("message received in queue " + transaction);
    //I will call other api to process the message and do some operation 
    //after the message is processed 
    //I have to Acknowledge the message is processed
    //so that i can consume the other message for process.
}

// jmsTemplate bean
public void sendmessage() {
    for (int i =0 ; i < 10 ; < i++) { 
        jmsTemplate.convertAndSend("QUEUE_1", i);
    }
}

1 Ответ

2 голосов
/ 06 февраля 2020

Вы должны использовать метод setSessionAcknowledgeMode в своем экземпляре org.springframework.jms.config.DefaultJmsListenerContainerFactory для установки режима CLIENT_ACKNOWLEDGE, например:

@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> {
        logger.info("An error has occurred in the transaction");
        logger.error(t.getCause().getMessage());
    });

    factory.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);

    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("4");

    // You could still override some of Boot's default if necessary.
    return factory;
}

Это обсуждается в Spring JMS JavaDo c:

Контейнер слушателя предлагает следующие параметры подтверждения сообщения:

  • для sessionAcknowledgeMode установлено значение «AUTO_ACKNOWLEDGE» (по умолчанию) ): Этот режим зависит от контейнера: для DefaultMessageListenerContainer это означает автоматическое c подтверждение сообщения перед выполнением прослушивателя, без повторной доставки в случае исключения и без повторной доставки в случай других прерываний выполнения слушателя также. Для SimpleMessageListenerContainer это означает автоматическое c подтверждение сообщения после выполнения прослушивателя, без повторной доставки в случае выданного пользовательского исключения, но потенциальной повторной доставки в случае смерти JVM во время прослушивателя выполнение. Чтобы последовательно организовать повторную доставку с любым вариантом контейнера, рассмотрите режим «CLIENT_ACKNOWLEDGE» или - предпочтительно - установив вместо «sessionTransacted» значение «true».

  • «sessionAcknowledgeMode» установлен в «DUPS_OK_ACKNOWLEDGE» ": Ленивое подтверждение сообщения во время (DefaultMessageListenerContainer) или вскоре после (SimpleMessageListenerContainer) выполнения слушателя; нет повторной доставки в случае пользовательского исключения, но возможна повторная доставка в случае смерти JVM во время выполнения слушателя. Чтобы последовательно организовать повторную доставку с любым вариантом контейнера, рассмотрите режим «CLIENT_ACKNOWLEDGE» или - предпочтительно - вместо установки «sessionTransacted» в «true».

  • «sessionAcknowledgeMode» установлен в «CLIENT_ACKNOWLEDGE» ": Автоматическое c подтверждение сообщения после успешного выполнения слушателя; наилучшая попытка доставки в случае возникновения пользовательского исключения, а также в случае других прерываний выполнения слушателя (таких как умирание JVM).

  • "sessionTransacted" установлен в "true": Подтверждение транзакции после успешного выполнения слушателя; гарантированная повторная доставка в случае возникновения пользовательского исключения, а также в случае других прерываний выполнения слушателя (например, смерти JVM).

Вы можете также используйте это в вашем Spring Boot application.properties:

spring.jms.listener.acknowledge-mode=CLIENT
...