Привет Требуется приостановить прослушивание rabbitmq от обработки сообщений во время изменения внутренних таблиц. Это изменение ограничено только моим приложением, поэтому я не хочу сбивать весь экземпляр rabbitmq. Когда процесс завершится, я хочу снова запустить слушателей.
Проблема, с которой я сталкиваюсь У меня есть 2 прослушивателя, подключенных к 2 отдельным очередям, совместно использующим 'customerconnectionFactory'. Когда я прервал соединение, только тот, у которого нет открытых каналов, был убит, а когда я возобновил соединение, я получил дополнительное соединение, которого раньше не было. Не могли бы вы помочь.
Я делюсь своими конфигами java ниже.
@Bean
public SimpleMessageListenerContainer auditMessageListenerContainer(AuditMessageListener auditMessageListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(messagingAuditQueue);
container.setMessageListener(auditMessageListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
@Bean
public SimpleMessageListenerContainer accessMessageListenerContainer(AccessLogListener accessLogListener)
{
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
container.setQueueNames(accessAuditQueue);
container.setMessageListener(accessLogListener);
container.setMaxConcurrentConsumers(5);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setMissingQueuesFatal(false);
container.setForceCloseChannel(true);
container.setExclusive(false);
return container;
}
Вот как я сделал конфигурацию Java для Слушателей.
Ниже приведен RestController для запуска и остановки слушателей
@RestController
@RequestMapping(MESSAGE_AUDIT_ROOT)
public class RestartController {
@Autowired
private List<MessageListenerContainer> listenerContainers;
@Autowired
private List<ConnectionFactory> connectionFactories;
@GetMapping("/stop")
public String stopMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.resetConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.shutdown();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - stop";
}
@GetMapping("/start")
public String startMessageListenerContainer() {
connectionFactories.forEach(conFactory -> {
CachingConnectionFactory cConFactory = (CachingConnectionFactory) conFactory;
cConFactory.createConnection();
});
listenerContainers.forEach(container -> {
SimpleMessageListenerContainer smlc = (SimpleMessageListenerContainer) container;
smlc.start();
});
listenerContainers.forEach(container -> System.out
.println("Container: " + container.toString() + "is Running ?" + container.isRunning()));
return "done - start";
}
}
Ниже приведены изображения для поведения, которое я вижу локально. 1. Список начальных подключений ![Initial connections list](https://i.stack.imgur.com/l910E.png)
Когда соединение прервано. Звоните повторно.
2.1. Очередь соединения все еще активна
3. Когда начинается соединение, Rest Call ![enter image description here](https://i.stack.imgur.com/hJTUf.png)