Остановите прослушиватель Spring JMS, разрешив публикацию сообщений JMS - PullRequest
0 голосов
/ 22 января 2020

Я использую Spring JMS и хотел бы отключить извлечение сообщений из очереди, в то же время позволяя публиковать новые сообщения в тех же (или других) очередях из этих прослушивателей.

У меня есть метод @JmsListener, который извлекает сообщения из очереди и обрабатывает их. Этот слушатель на разных этапах будет публиковать sh новых сообщений, используя JmsTemplate.convertAndSend()

@JmsListener(destination = "queue-name")
public void processJob(Object job) {
    jmsTemplate.convertAndSend("begin");
    // Do some stuff
    jmsTemplate.convertAndSend("almost-done");
    // More stuff!
    jmsTemplate.convertAndSend("done");
}

У меня также есть очень простая фабрика слушателей:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory myFactory, MessageConverter jacksonJmsMessageConverter) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(myFactory);
    factory.setMessageConverter(jacksonJmsMessageConverter);
    return factory;
}

Теперь, что я want - остановить слушателя от извлечения сообщений из очереди, в то же время позволяя вызовам convertAndSend в методе успешно продолжаться.

Единственное, что я видел, чтобы остановить очередь, это вызывать jmsListenerEndpointRegistry.stop(); на бобе JmsListenerEndpointRegistry. Это остановит очередь должным образом, и никакие сообщения не будут извлечены. Однако, кажется, что остановка JmsListenerEndpointRegistry также приведет к сообщениям, которые я пытаюсь опубликовать sh с JmsTemplate.convertAndSend, чтобы не опубликовать sh.

Когда я пытаюсь convertAndSend сообщение После вызова jmsListenerEndpointRegistry.stop(); я получаю следующую ошибку:

o.a.activemq.ActiveMQSessionExecutor     : Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()?

Эта ошибка имеет смысл, я полагаю. Остановка реестра конечных точек не только отключает функциональность для слушателей, но также и функцию публикации сообщений.

Поэтому мой вопрос заключается в следующем: учитывая приведенный выше сценарий со слушателем, как я могу выключить слушатель так, чтобы новые сообщения не обрабатываются, но все еще допускается успешная публикация JmsTemplate.convertAndSend сообщений?

1 Ответ

1 голос
/ 22 января 2020

В вашем приложении / среде есть что-то необычное. Это работает нормально ...

@SpringBootApplication
public class So59863889Application {

    private static final Logger LOG = LoggerFactory.getLogger(So59863889Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So59863889Application.class, args);
    }

    @Autowired
    JmsTemplate template;

    @JmsListener(id = "foo", destination = "foo")
    public void listen1(String in) throws InterruptedException {
        LOG.info(in);
        Thread.sleep(5_000);
        this.template.convertAndSend("bar", in.toUpperCase());
    }

    @JmsListener(id = "bar", destination = "bar")
    public void listen2(String in) throws InterruptedException {
        LOG.info(in);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template, JmsListenerEndpointRegistry registry) {
        return args -> {
            IntStream.range(0, 20).forEach(i -> template.convertAndSend("foo", "test" + i));
            System.in.read();
            LOG.info("Stopping foo");
            registry.getListenerContainer("foo").stop();
        };
    }

}

Отправка после sleep() (после остановки контейнера работает нормально).

Я получаю тот же результат, когда переопределяю фабрику контейнеров Boot, но явная ссылка на ActiveMQConnectionFactory терпит неудачу для меня (такого бина нет); Я должен изменить его на просто ConnectionFactory.

Если вы можете предоставить простое приложение (например, мое), которое воспроизводит проблему, я могу посмотреть дальше.

...