Я настроил кластер RabbitMQ и использую Spring Cloud Stream поверх RabbitMQ с такой конфигурацией:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface FileChangedSink {
String INPUT = "fileChanged";
@Input(INPUT)
SubscribableChannel fileChanged();
}
И application.yml
spring:
cloud:
stream:
bindings:
fileChanged:
destination: file.changed
binder: stream_rabbit
consumer:
max-attempts: 1
binders:
stream_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: ${RABBITMQ_HOST}
port: ${RABBITMQ_NODE_PORT_NUMBER}
username: ${RABBITMQ_DEFAULT_USER}
password: ${RABBITMQ_DEFAULT_PASS}
virtual-host: ${RABBITMQ_DEFAULT_VHOST}
Версии:
Spring Boot Version: 1.5.10.RELEASE
spring-cloud-starter-stream: 1.3.2.RELEASE
spring-cloud-starter-stream-rabbit: 1.3.3.RELEASE
Обычно этот конфиг работает нормально, но недавно я получил с ним такое исключение:
2019-10-28T08:11:51.832Z 2019-10-28 08:11:51.831 WARN [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: file.changed.anonymous.nL_BWxroRf65qW883QNAvQ
2019-10-28T08:11:56.835Z 2019-10-28 08:11:56.835 ERROR [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal=false exception on startup
2019-10-28T08:11:56.835Z org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:626)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1472)
2019-10-28T08:11:56.835Z at java.base/java.lang.Thread.run(Thread.java:844)
2019-10-28T08:11:56.835Z Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[file.changed.anonymous.nL_BWxroRf65qW883QNAvQ]
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:718)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:594)
2019-10-28T08:11:56.835Z ... 2 common frames omitted
2019-10-28T08:11:56.835Z Caused by: java.io.IOException: null
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:105)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:101)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:123)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:992)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:50)
2019-10-28T08:11:56.835Z at jdk.internal.reflect.GeneratedMethodAccessor1279.invoke(Unknown Source)
2019-10-28T08:11:56.835Z at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
2019-10-28T08:11:56.835Z at java.base/java.lang.reflect.Method.invoke(Method.java:564)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:980)
2019-10-28T08:11:56.835Z at com.sun.proxy.$Proxy228.queueDeclarePassive(Unknown Source)
2019-10-28T08:11:56.835Z at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:697)
2019-10-28T08:11:56.835Z ... 3 common frames omitted
2019-10-28T08:11:56.835Z Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'file.changed.anonymous.nL_BWxroRf65qW883QNAvQ' in vhost '/', class-id=50, method-id=10)
2019-10-28T08:11:56.835Z at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
2019-10-28T08:11:56.835Z at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
2019-10-28T08:11:56.835Z ... 11 common frames omitted
2019-10-28T08:11:56.835Z Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'file.changed.anonymous.nL_BWxroRf65qW883QNAvQ' in vhost '/', class-id=50, method-id=10)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:505)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:336)
2019-10-28T08:11:56.835Z at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:143)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:90)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:634)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
2019-10-28T08:11:56.836Z at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:572)
2019-10-28T08:11:56.836Z ... 1 common frames omitted
2019-10-28T08:11:56.836Z 2019-10-28 08:11:56.835 INFO [search-service,,,] 1 --- [qW883QNAvQ-5986] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@2fe6f292: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://adminRGfa93@172.31.71.134:5672/,10419), conn: Proxy@2f143ea Shared Rabbit Connection: SimpleConnection@1daf2e3e [delegate=amqp://adminRGfa93@172.31.71.134:5672/, localPort= 52338], acknowledgeMode=AUTO local queue size=0
2019-10-28T08:11:56.848Z 2019-10-28 08:11:56.848 WARN [search-service,,,] 1 --- [qW883QNAvQ-5987] o.s.a.r.listener.BlockingQueueConsumer : Failed to declare queue: file.changed.anonymous.nL_BWxroRf65qW883QNAvQ
2019-10-28T08:11:56.848Z 2019-10-28 08:11:56.848 WARN [search-service,,,] 1 --- [qW883QNAvQ-5987] o.s.a.r.listener.BlockingQueueConsumer : Queue declaration failed; retries left=3
2019-10-28T08:11:56.848Z org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[file.changed.anonymous.nL_BWxroRf65qW883QNAvQ]
Примечание: эта ошибка случайна, и обычно мой конфиг работает нормально.
Из ошибки я могу понять, что очередь не была создана до того, как поток попытался подключиться к ней.
Итак, мой вопрос: как это могло произойти и как я могу предотвратить такую ситуацию?