Spring AMQP RabbitListener настраивает конфигурацию автоматического переподключения - PullRequest
0 голосов
/ 27 мая 2020

Я пытаюсь лучше понять поведение RabbitListener, когда брокер выходит из строя и когда его восстанавливают. Я установил кролика в контейнере docker поверх localhost: 6672 и создал простое приложение для весенней загрузки для прослушивания сообщений. Также настроен application.yml для указания на этого кролика.

        @RabbitListener(containerFactory = "myListenerContainerFactory", 
                bindings = @QueueBinding(value = @Queue(value = "MY_QUEUE"), exchange = @Exchange(value = "MY_EXCHANGE"), key = "MY_KEY"))
        public void onMessage(final Mesg message) {
            //some handling
        }

Также определена фабрика контейнеров ниже

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory cf) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setDefaultRequeueRejected(false);
        factory.setConsumerTagStrategy(q -> "My App");
        return factory;
    }

Я запустил приложение весенней загрузки, когда контейнер кролика работал, и он смог автоматически создать обмен / очередь и мог принимать сообщения, которые я отправлял через консоль администратора кролика. Я принудительно удалил контейнер с кроликом (docker rm my_instance -f), пока работало приложение весенней загрузки, и оно начало печатать следующее сообщение.

2020-05-26 17:03:15.097 ERROR 26816 --- [ 127.0.0.1:6672]  o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error
2020-05-26 17:03:15.614  INFO 26816 --- [cTaskExecutor-1]  o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@69176296: tags=[[My App]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:6672/,1), conn: Proxy@522c56e8 Shared Rabbit Connection: SimpleConnection@7578dfd0 [delegate=amqp://guest@127.0.0.1:6672/, localPort= 65342], acknowledgeMode=AUTO local queue size=0
2020-05-26 17:03:15.618  INFO 26816 --- [cTaskExecutor-2]  o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: localhost:6672
2020-05-26 17:03:22.662  WARN 26816 --- [cTaskExecutor-2]  o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2020-05-26 17:03:22.663  INFO 26816 --- [cTaskExecutor-2]  o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@31978e0c: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2020-05-26 17:03:22.663  INFO 26816 --- [cTaskExecutor-3]  o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: localhost:6672
2020-05-26 17:03:29.683  WARN 26816 --- [cTaskExecutor-3]  o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

После повторного запуска контейнера с кроликом приложение весенней загрузки, похоже, обнаружило его однако он напечатал следующее сообщение, и я не думаю, что он установил успешное соединение. Глядя на журналы, кажется, что по умолчанию установлено 3 повторных попытки. Есть ли конфигурация, которую я могу использовать для увеличения лимита?

Есть ли какие-либо лучшие методы обработки таких случаев? Ожидается, что мое приложение будет подключаться к различным кластерам кроликов, и возможно, что 1 или другой может go при обслуживании, и мне нужен был способ автоматического повторного подключения, как только кластер кроликов будет восстановлен.

2020-05-26 17:05:18.303  INFO 26816 --- [TaskExecutor-19]  o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@5053bc3c: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2020-05-26 17:05:18.306  INFO 26816 --- [TaskExecutor-20]  o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: localhost:6672
2020-05-26 17:05:18.348  INFO 26816 --- [TaskExecutor-20]  o.s.a.r.c.CachingConnectionFactory       : Created new connection: myConnectionFactory#ebfe707:19/SimpleConnection@19d03268 [delegate=amqp://guest@127.0.0.1:6672/, localPort= 49260]
2020-05-26 17:05:18.396  WARN 26816 --- [TaskExecutor-20]  o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: MY_QUEUE
2020-05-26 17:05:18.402  WARN 26816 --- [TaskExecutor-20]  o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[MY_QUEUE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52) ~[amqp-client-5.4.3.jar:5.4.3]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at com.sun.proxy.$Proxy283.queueDeclarePassive(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689) ~[spring-rabbit-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_QUEUE' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ~[amqp-client-5.4.3.jar:5.4.3]
    ... 14 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_QUEUE' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48) ~[amqp-client-5.4.3.jar:5.4.3]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597) ~[amqp-client-5.4.3.jar:5.4.3]
    ... 1 common frames omitted

2020-05-26 17:05:23.420  WARN 26816 --- [TaskExecutor-20]  o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: MY_QUEUE
2020-05-26 17:05:23.425  WARN 26816 --- [TaskExecutor-20]  o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=2

1 Ответ

2 голосов
/ 27 мая 2020
 reply-text=NOT_FOUND - no queue 'MY_QUEUE' in vhost '/'

У вас есть MY_QUEUE, который объявлен как auto-delete и выглядит так, как будто во время установления соединения его воссоздание не происходит. Подумайте о том, чтобы использовать его как bean-компонент и позволить RabbitAdmin обрабатывать его создание при повторном подключении.

См. Дополнительную информацию в документации: https://docs.spring.io/spring-amqp/docs/2.2.7.RELEASE/reference/html/#broker -configuration

...