Spring Cloud Stream RabbitMQ случайно завершается с ошибкой очереди NOT_FOUND - PullRequest
0 голосов
/ 06 ноября 2019

Я настроил кластер RabbitMQ и использую Spring Cloud Stream поверх RabbitMQ с такой конфигурацией:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface FileChangedSink {
    String INPUT = "fileChanged";

    @Input(INPUT)
    SubscribableChannel fileChanged();
}

И application.yml

spring:
  cloud:
    stream:
      bindings:
        fileChanged:
          destination: file.changed
          binder: stream_rabbit
          consumer:
            max-attempts: 1

      binders:
        stream_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: ${RABBITMQ_HOST}
                port: ${RABBITMQ_NODE_PORT_NUMBER}
                username: ${RABBITMQ_DEFAULT_USER}
                password: ${RABBITMQ_DEFAULT_PASS}
                virtual-host: ${RABBITMQ_DEFAULT_VHOST}

Версии:

Spring Boot Version: 1.5.10.RELEASE
spring-cloud-starter-stream: 1.3.2.RELEASE
spring-cloud-starter-stream-rabbit: 1.3.3.RELEASE

Обычно этот конфиг работает нормально, но недавно я получил с ним такое исключение:

2019-10-28T08:11:51.832Z 2019-10-28 08:11:51.831  WARN [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: file.changed.anonymous.nL_BWxroRf65qW883QNAvQ
2019-10-28T08:11:56.835Z 2019-10-28 08:11:56.835 ERROR [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal=false exception on startup
2019-10-28T08:11:56.835Z org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
2019-10-28T08:11:56.835Z    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:626)
2019-10-28T08:11:56.835Z    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1472)
2019-10-28T08:11:56.835Z    at java.base/java.lang.Thread.run(Thread.java:844)
2019-10-28T08:11:56.835Z Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[file.changed.anonymous.nL_BWxroRf65qW883QNAvQ]
2019-10-28T08:11:56.835Z    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:718)
2019-10-28T08:11:56.835Z    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:594)
2019-10-28T08:11:56.835Z    ... 2 common frames omitted
2019-10-28T08:11:56.835Z Caused by: java.io.IOException: null
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50)
2019-10-28T08:11:56.835Z    at jdk.internal.reflect.GeneratedMethodAccessor1279.invoke(Unknown Source)
2019-10-28T08:11:56.835Z    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2019-10-28T08:11:56.835Z    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
2019-10-28T08:11:56.835Z    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980)
2019-10-28T08:11:56.835Z    at com.sun.proxy.$Proxy228.queueDeclarePassive(Unknown Source)
2019-10-28T08:11:56.835Z    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:697)
2019-10-28T08:11:56.835Z    ... 3 common frames omitted
2019-10-28T08:11:56.835Z Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'file.changed.anonymous.nL_BWxroRf65qW883QNAvQ' in vhost '/', class-id=50, method-id=10)
2019-10-28T08:11:56.835Z    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
2019-10-28T08:11:56.835Z    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
2019-10-28T08:11:56.835Z    ... 11 common frames omitted
2019-10-28T08:11:56.835Z Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'file.changed.anonymous.nL_BWxroRf65qW883QNAvQ' in vhost '/', class-id=50, method-id=10)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336)
2019-10-28T08:11:56.835Z    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143)
2019-10-28T08:11:56.836Z    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90)
2019-10-28T08:11:56.836Z    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
2019-10-28T08:11:56.836Z    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
2019-10-28T08:11:56.836Z    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
2019-10-28T08:11:56.836Z    ... 1 common frames omitted
2019-10-28T08:11:56.836Z 2019-10-28 08:11:56.835  INFO [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@2fe6f292: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://adminRGfa93@172.31.71.134:5672/,10419), conn: Proxy@2f143ea Shared Rabbit Connection: SimpleConnection@1daf2e3e [delegate=amqp://adminRGfa93@172.31.71.134:5672/, localPort= 52338], acknowledgeMode=AUTO local queue size=0
2019-10-28T08:11:56.848Z 2019-10-28 08:11:56.848  WARN [search-service,,,] 1 --- [qW883QNAvQ-5987] o.s.a.r.listener.BlockingQueueConsumer   : Failed to declare queue: file.changed.anonymous.nL_BWxroRf65qW883QNAvQ
2019-10-28T08:11:56.848Z 2019-10-28 08:11:56.848  WARN [search-service,,,] 1 --- [qW883QNAvQ-5987] o.s.a.r.listener.BlockingQueueConsumer   : Queue declaration failed; retries left=3
2019-10-28T08:11:56.848Z org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[file.changed.anonymous.nL_BWxroRf65qW883QNAvQ]

Примечание: эта ошибка случайна, и обычно мой конфиг работает нормально.

Из ошибки я могу понять, что очередь не была создана до того, как поток попытался подключиться к ней.

Итак, мой вопрос: как это могло произойти и как я могу предотвратить такую ​​ситуацию?

1 Ответ

2 голосов
/ 06 ноября 2019

Boot 1.5.x больше не поддерживается;последняя версия 1.5.22 была выпущена в августе.

Вам необходимо установить аргумент очереди x-queue-master-locator в client-local, чтобы обеспечить создание анонимной очереди на узле, к которому подключено приложение.

Начиная с версии spring-amqp 2.1 (используемой в Spring Boot 2.1 и связывателе 2.2), это выполняется автоматически фреймворком.

В более ранних версиях вы можете установить мастер-локатор для своего анонимногоочереди с использованием политики в брокере.

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit/issues/245

РЕДАКТИРОВАТЬ

Снимок экрана политики:

Screen shot of policy

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...