Слушатель не извлекает данные после восстановления соединения с хостом - PullRequest
0 голосов
/ 02 марта 2020

Позвольте мне начать с того, что я только начал заниматься AMQP.

Я хочу использовать / извлекать данные из очереди. Я использую библиотеки Spring (spring-boot-starter-amqp) для упрощения работы. У меня есть класс слушателя с методом, аннотированным @RabbitListener, где я устанавливаю очередь. Все остальное настраивается через свойства:

    rabbitmq:
      username: user
      password: password
      virtual-host: virtual-host
      port: 5672
      host: host
      queue: _316_
      listener:
        simple:
          retry:
            enabled: true
            initial-interval: 1000
            max-attempts: 8
            max-interval: 10000
            multiplier: 2.0
            stateless: true

Все работает нормально, пока я не сделаю хост некоторое время недоступным. Когда это происходит, соединение разрывается, и предпринимаются попытки для его восстановления sh. После восстановления соединения слушатель не начинает получать сообщения. После перезапуска приложения все в порядке, но я уверен, что его можно каким-то образом настроить, чтобы потребитель продолжал перезапускать, по крайней мере, он должен попытаться сделать это после восстановления соединения (или, по крайней мере, это то, что я ожидал).

После того, как соединение было разорвано, в журналах можно найти следующее:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); Consumer@22ead351: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@22ead351: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)\n\tat java.base/java.lang.Thread.run(Thread.java:835)\nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat 

Затем сделана попытка подключения, и мы находимся в oop:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED 

До соединение восстановлено:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/SimpleConnection@3931e0ad [delegate=amqp://user@host, localPort= 50574]

И больше ничего не происходит - сообщения не принимаются.

ОБНОВЛЕНИЕ : Выполнено предложение и включено ведение журнала отладки.

Когда приложение запускается, мы:

  1. запускаем контейнер слушателя
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
  1. создаем соединение

  2. стартовый потребитель

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@3daf03d8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
создание канала и начало потребления
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://user@host,1)

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[B@4b817fae(byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])

Это продолжается до тех пор, пока не прервется соединение:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp:///user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

Канал отключается и пользователь каким-то образом удаляется, так как журнал говорит:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

Проблема с драйвером подключения, после которой выдается исключение:

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.support.ConsumerCancelledException: null\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)\n\tat java.base/java.lang.Thread.run(Thread.java:835

Потребитель вызывает исключение и сообщает, что обработка может перезапуститься

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it

Возникает повторение:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

Каналы закрываются:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://user@host,1)

Новый потребитель запускается:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

Мы пытаемся подключиться, что заканчивается сбоями WARN и AUTHENTICATION , (потому что в предыдущем журнале говорилось, что пользователь был удален?):

An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat

ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat

Потребитель, который пытался запустить:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup

И он (потребитель) отменяется:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

Канал закрыт, и контейнер останавливается:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

А затем контейнер закрывается:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container

Мы ждем, когда рабочие закончат работу sh, это успешно, затем мы пытаемся подключиться снова, тот же SOCKET_CLOSED регистрируется снова и снова.

Затем хост возвращается и соединение восстанавливается. Кэшированный канал Rabit создается и ничего не происходит.

Я бы предположил, что проблема в том, что контейнер был закрыт и никогда не возвращался к жизни, следовательно, нет потребителей.

ЧТО РАБОТАЛО :

Я создал класс с методом «прослушивания», который принимает ListenerContainerConsumerFailedEvent. Этот класс имеет RabbitListenerEndpointRegistry (бин, который удобно для меня создал Boot), и всякий раз, когда вызывается этот метод, я проверяю, работает ли listenerContainer, если нет, то я запускаю его (эта проверка, скорее всего, избыточна).

@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
        if (!listenerContainer.isRunning()){
            listenerContainer.start();
        }
    }

1 Ответ

2 голосов
/ 04 марта 2020

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: ошибка аутентификации

FatalListenerStartupException

Сбои аутентификации считаются фатальными, и контейнер немедленно остановился; маловероятно, что такие ситуации будут исправлены автоматически.

Удаление пользователя, который в данный момент используется, является довольно необычным обстоятельством.

Вы можете использовать ApplicationListener bean или @EventListener метод для прослушайте ListenerContainerConsumerTerminatedEvent и попробуйте перезапустить контейнер через некоторое время.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...