Spring - Rabbitmq Listener должен быть приостановлен и возобновлен во время изменения базы данных - PullRequest
0 голосов
/ 09 октября 2019

Привет Требуется приостановить прослушивание rabbitmq от обработки сообщений во время изменения внутренних таблиц. Это изменение ограничено только моим приложением, поэтому я не хочу сбивать весь экземпляр rabbitmq. Когда процесс завершится, я хочу снова запустить слушателей.

Проблема, с которой я сталкиваюсь У меня есть 2 прослушивателя, подключенных к 2 отдельным очередям, совместно использующим 'customerconnectionFactory'. Когда я прервал соединение, только тот, у которого нет открытых каналов, был убит, а когда я возобновил соединение, я получил дополнительное соединение, которого раньше не было. Не могли бы вы помочь.

Я делюсь своими конфигами java ниже.

@Bean
    public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(messagingAuditQueue);
        container.setMessageListener(auditMessageListener);
        container.setMaxConcurrentConsumers(5);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setMissingQueuesFatal(false);
        container.setForceCloseChannel(true);
        container.setExclusive(false);
        return container;
    }
    @Bean
    public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
    {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(accessAuditQueue);
        container.setMessageListener(accessLogListener);
        container.setMaxConcurrentConsumers(5);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setMissingQueuesFatal(false);
        container.setForceCloseChannel(true);
        container.setExclusive(false);
        return container;
    }

Вот как я сделал конфигурацию Java для Слушателей.

Ниже приведен RestController для запуска и остановки слушателей

@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
    @Autowired
    private List<MessageListenerContainer> listenerContainers;

    @Autowired
    private List<ConnectionFactory> connectionFactories;

    @GetMapping("/stop")
    public String stopMessageListenerContainer() {
        connectionFactories.forEach(conFactory -> {
            CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
            cConFactory.resetConnection();
        });
        listenerContainers.forEach(container -> {
            SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
            smlc.shutdown();
        });
        listenerContainers.forEach(container -> System.out
                .println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
        return "done - stop";
    }

    @GetMapping("/start")
    public String startMessageListenerContainer() {
        connectionFactories.forEach(conFactory -> {
            CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
            cConFactory.createConnection();
        });
        listenerContainers.forEach(container -> {
            SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
            smlc.start();
        });
        listenerContainers.forEach(container -> System.out
                .println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
        return "done - start";
    }

}

Ниже приведены изображения для поведения, которое я вижу локально. 1. Список начальных подключений Initial connections list

Когда соединение прервано. Звоните повторно. enter image description here

2.1. Очередь соединения все еще активна enter image description here 3. Когда начинается соединение, Rest Call enter image description here

1 Ответ

1 голос
/ 09 октября 2019

При использовании режима кэширования по умолчанию (CHANNEL) всегда должно быть только одно соединение, если только вы не сконфигурируете RabbitTemplate с usePublisherConnection, установленным в значение true, в этом случае имя соединения будет api-audit.publisher.

Поскольку у вас есть две связи с именем api-audit, происходит нечто очень странное. Я подозреваю, что у вас как-то загружены две фабрики соединений, возможно, одна находится в контексте дочернего приложения? Вы не можете иметь два bean-компонента с одним и тем же именем в одном контексте приложения.

т.е. вы вызываете resetConnection на одном из них, но не на другом.

Я предлагаю вам поставитьточка останова в createConnection, чтобы увидеть, кто использует второй CF.

Кстати, вы действительно должны сбросить соединение после остановки контейнера;в противном случае контейнер перейдет в режим восстановления и может повторно открыть соединение в зависимости от времени.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...