Spring-amqp - обработка сообщений задерживается - PullRequest
0 голосов
/ 06 июня 2018

У нас есть приложение Java / spring / tomcat, развернутое на виртуальной машине RHEL 7.0, которая использует AlejandroRivera / embedded-rabbitmq и запускает сервер Rabbitmq, как только развертывается война, и подключается к нему.У нас есть несколько очередей, которые мы используем для обработки и фильтрации событий.

Поток выглядит примерно так:

событие, которое мы получили -> опубликовать очередь событий -> класс прослушивателей фильтрует события -> опубликовать в другую очередь для обработки -> мы публикуем еще одну очередь для ведения журнала.

Проблема заключается в следующем:

  • Обработка начинается нормально, мы можем видеть, что сообщения проходят через очереди,но через некоторое время класс слушателя перестает получать события.Кажется, мы смогли опубликовать его на канале RabbitMQ, но он никогда не выходил из очереди для слушателя.Кажется, это начинает ухудшаться, вызывая обработку событий через некоторое время, увеличиваясь до минут.Нагрузка не такая высокая, это около 200 событий, из которых мы заботимся только о нескольких из них.

То, что мы пробовали:

  • Первоначально для очередей для предварительной выборки было установлено значение 1 , а для потребителей - мин. 2 имаксимум 5 , мы удалили предварительную выборку и добавили больше потребителей в качестве параметра максимального параллелизма, но проблема все еще существует, задержка просто занимает больше времени, но через несколько минут обработка начинает занимать около 20/30 секунд.

Мы видим в журналах, что мы опубликовали событие в очереди, и мы видим журнал, в котором мы получили его из очереди с задержкой.Поэтому в нашем коде посередине ничего не происходит, чтобы сгенерировать эту задержку.

Насколько мы можем судить, остальные очереди, похоже, обрабатывают сообщения должным образом, но именно эта попадает в этот застрявший режим..

Ошибки, которые я вижу, следующие, но я уверен, что это означает, и если это связано:

Jun  4 11:16:04  server: [pool-3-thread-10] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Consumer org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer@70dfa413 (amq.ctag-VaWc-hv-VwcUPh9mTQTj7A) method handleDelivery for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198) threw an exception for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198)
Jun  4 11:16:04  server: java.io.IOException: Unknown consumerTag
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1266)
Jun  4 11:16:04  server: at sun.reflect.GeneratedMethodAccessor180.invoke(Unknown Source)
Jun  4 11:16:04  server: at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun  4 11:16:04  server: at java.lang.reflect.Method.invoke(Method.java:498)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955)
Jun  4 11:16:04  server: at com.sun.proxy.$Proxy119.basicCancel(Unknown Source)
Jun  4 11:16:04  server: at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:846)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
Jun  4 11:16:04  server: at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Jun  4 11:16:04  server: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Jun  4 11:16:04  server: at java.lang.Thread.run(Thread.java:748)

Это происходит при закрытии приложения, но яЯ видел, как это происходило, пока приложение еще работало.

2018-06-05 13:22:45,443 ERROR CachingConnectionFactory$DefaultChannelCloseLogger - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 109, class-id=60, method-id=120)

Я не уверен, как устранить эти две ошибки и не связаны ли они.

Вот мой конфигурационный файл Spring:

<!-- Queues -->
<rabbit:queue id="monitorIncomingEventsQueue" name="MonitorIncomingEventsQueue"/>
<rabbit:queue id="interestingEventsQueue" name="InterestingEventsQueue"/>
<rabbit:queue id="textCallsEventsQueue" name="TextCallsEventsQueue"/>
<rabbit:queue id="callDisconnectedEventQueue" name="CallDisconnectedEventQueue"/>
<rabbit:queue id="incomingCallEventQueue" name="IncomingCallEventQueue"/>
<rabbit:queue id="eventLoggingQueue" name="EventLoggingQueue"/>

<!-- listeners -->
<bean id="monitorListener" class="com.example.rabbitmq.listeners.monitorListener"/>
<bean id="interestingEventsListener" class="com.example.rabbitmq.listeners.InterestingEventsListener"/>
<bean id="textCallsEventListener" class="com.example.rabbitmq.listeners.TextCallsEventListener"/>
<bean id="callDisconnectedEventListener" class="com.example.rabbitmq.listeners.CallDisconnectedEventListener"/>
<bean id="incomingCallEventListener" class="com.example.rabbitmq.listeners.IncomingCallEventListener"/>
<bean id="eventLoggingEventListener" class="com.example.rabbitmq.listeners.EventLoggingListener"/>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="40" acknowledge="none">
    <rabbit:listener queues="interestingEventsQueue" ref="interestingEventsListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="textCallsEventsQueue" ref="textCallsEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
    <rabbit:listener queues="callDisconnectedEventQueue" ref="callDisconnectedEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="30" acknowledge="none">
    <rabbit:listener queues="incomingCallEventQueue" ref="incomingCallEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="1" max-concurrency="3" acknowledge="none">
    <rabbit:listener queues="monitorIncomingEventsQueue" ref="monitorListener" method="handleIncomingMessage"/>
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="10"  acknowledge="none">
    <rabbit:listener queues="EventLoggingQueue" ref="eventLoggingEventListener" method="handleLoggingEvent"/>
</rabbit:listener-container>

<rabbit:connection-factory id="connectionFactory" host="${host.name}" port="${port.number}" username="${user.name}" password="${user.password}" connection-timeout="20000"/>

Я читал здесь, что задержка при обработке может быть вызвана проблемой сети, но в этом случае сервер и приложение находятся на одной виртуальной машине.Это заблокированная среда, поэтому большинство портов не открыты, но я сомневаюсь, что это не так.

Больше журналов: https://pastebin.com/4QMFDT7A

Любая помощь приветствуется,

Спасибо,

1 Ответ

0 голосов
/ 06 июня 2018

Мне нужно увидеть гораздо больше журналов, чем это - это пистолет для курения:

Storing...Storing delivery for Consumer@a2ce092: tags=[{}]

(Потребитель) tags пуст, что означает, что потребитель уже был отменен в то время (по какой-то причине, которая должна появиться ранее в журнале).

Если есть вероятность, что вы сможете воспроизвести с помощью 1.7.9.BUILD-SNAPSHOT, я добавил несколько журналов уровня TRACE, которые должны помочь в диагностике этого.

РЕДАКТИРОВАТЬ

В ответ на ваш недавний комментарий к rabbitmq-users ...

Можете ли вы попробовать с фиксированным параллелизмом?Переменный параллелизм в контейнере Spring AMQP часто не очень полезен, потому что потребители, как правило, будут сокращены, только если весь контейнер простаивает в течение некоторого времени.

Это может объяснить, однако, почему вы видите, что потребители отменяются.

Возможно, в этой логике есть / были некоторые расовые условия;использование фиксированного количества потребителей (не указывайте max ...) позволит избежать этого;если вы можете попробовать, это, по крайней мере, исключит это как возможность.

Тем не менее, я запутался (я не заметил этого в вашей конфигурации переполнения стека);с acknowledge="none" не следует отправлять подтверждения брокеру (NONE используется для установки autoAck)

String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), ...

и

public boolean isAutoAck() {

    return this == NONE;

}

Вы отправляете подтверждения с вашегокод?Если это так, режим подтверждения должен быть ручным.Я не вижу сценария, когда контейнер будет отправлять подтверждение для режима подтверждения NONE.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...