У нас есть приложение Java / spring / tomcat, развернутое на виртуальной машине RHEL 7.0, которая использует AlejandroRivera / embedded-rabbitmq и запускает сервер Rabbitmq, как только развертывается война, и подключается к нему.У нас есть несколько очередей, которые мы используем для обработки и фильтрации событий.
Поток выглядит примерно так:
событие, которое мы получили -> опубликовать очередь событий -> класс прослушивателей фильтрует события -> опубликовать в другую очередь для обработки -> мы публикуем еще одну очередь для ведения журнала.
Проблема заключается в следующем:
- Обработка начинается нормально, мы можем видеть, что сообщения проходят через очереди,но через некоторое время класс слушателя перестает получать события.Кажется, мы смогли опубликовать его на канале RabbitMQ, но он никогда не выходил из очереди для слушателя.Кажется, это начинает ухудшаться, вызывая обработку событий через некоторое время, увеличиваясь до минут.Нагрузка не такая высокая, это около 200 событий, из которых мы заботимся только о нескольких из них.
То, что мы пробовали:
- Первоначально для очередей для предварительной выборки было установлено значение 1 , а для потребителей - мин. 2 имаксимум 5 , мы удалили предварительную выборку и добавили больше потребителей в качестве параметра максимального параллелизма, но проблема все еще существует, задержка просто занимает больше времени, но через несколько минут обработка начинает занимать около 20/30 секунд.
Мы видим в журналах, что мы опубликовали событие в очереди, и мы видим журнал, в котором мы получили его из очереди с задержкой.Поэтому в нашем коде посередине ничего не происходит, чтобы сгенерировать эту задержку.
Насколько мы можем судить, остальные очереди, похоже, обрабатывают сообщения должным образом, но именно эта попадает в этот застрявший режим..
Ошибки, которые я вижу, следующие, но я уверен, что это означает, и если это связано:
Jun 4 11:16:04 server: [pool-3-thread-10] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Consumer org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer@70dfa413 (amq.ctag-VaWc-hv-VwcUPh9mTQTj7A) method handleDelivery for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198) threw an exception for channel AMQChannel(amqp://agent@127.0.0.1:5672/,198)
Jun 4 11:16:04 server: java.io.IOException: Unknown consumerTag
Jun 4 11:16:04 server: at com.rabbitmq.client.impl.ChannelN.basicCancel(ChannelN.java:1266)
Jun 4 11:16:04 server: at sun.reflect.GeneratedMethodAccessor180.invoke(Unknown Source)
Jun 4 11:16:04 server: at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Jun 4 11:16:04 server: at java.lang.reflect.Method.invoke(Method.java:498)
Jun 4 11:16:04 server: at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:955)
Jun 4 11:16:04 server: at com.sun.proxy.$Proxy119.basicCancel(Unknown Source)
Jun 4 11:16:04 server: at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:846)
Jun 4 11:16:04 server: at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
Jun 4 11:16:04 server: at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
Jun 4 11:16:04 server: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Jun 4 11:16:04 server: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Jun 4 11:16:04 server: at java.lang.Thread.run(Thread.java:748)
Это происходит при закрытии приложения, но яЯ видел, как это происходило, пока приложение еще работало.
2018-06-05 13:22:45,443 ERROR CachingConnectionFactory$DefaultChannelCloseLogger - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 109, class-id=60, method-id=120)
Я не уверен, как устранить эти две ошибки и не связаны ли они.
Вот мой конфигурационный файл Spring:
<!-- Queues -->
<rabbit:queue id="monitorIncomingEventsQueue" name="MonitorIncomingEventsQueue"/>
<rabbit:queue id="interestingEventsQueue" name="InterestingEventsQueue"/>
<rabbit:queue id="textCallsEventsQueue" name="TextCallsEventsQueue"/>
<rabbit:queue id="callDisconnectedEventQueue" name="CallDisconnectedEventQueue"/>
<rabbit:queue id="incomingCallEventQueue" name="IncomingCallEventQueue"/>
<rabbit:queue id="eventLoggingQueue" name="EventLoggingQueue"/>
<!-- listeners -->
<bean id="monitorListener" class="com.example.rabbitmq.listeners.monitorListener"/>
<bean id="interestingEventsListener" class="com.example.rabbitmq.listeners.InterestingEventsListener"/>
<bean id="textCallsEventListener" class="com.example.rabbitmq.listeners.TextCallsEventListener"/>
<bean id="callDisconnectedEventListener" class="com.example.rabbitmq.listeners.CallDisconnectedEventListener"/>
<bean id="incomingCallEventListener" class="com.example.rabbitmq.listeners.IncomingCallEventListener"/>
<bean id="eventLoggingEventListener" class="com.example.rabbitmq.listeners.EventLoggingListener"/>
<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="40" acknowledge="none">
<rabbit:listener queues="interestingEventsQueue" ref="interestingEventsListener" method="handleIncomingMessage"/>
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
<rabbit:listener queues="textCallsEventsQueue" ref="textCallsEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="20" acknowledge="none">
<rabbit:listener queues="callDisconnectedEventQueue" ref="callDisconnectedEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="30" acknowledge="none">
<rabbit:listener queues="incomingCallEventQueue" ref="incomingCallEventListener" method="handleIncomingMessage"/>
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="1" max-concurrency="3" acknowledge="none">
<rabbit:listener queues="monitorIncomingEventsQueue" ref="monitorListener" method="handleIncomingMessage"/>
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory" message-converter="defaultMessageConverter" concurrency="5" max-concurrency="10" acknowledge="none">
<rabbit:listener queues="EventLoggingQueue" ref="eventLoggingEventListener" method="handleLoggingEvent"/>
</rabbit:listener-container>
<rabbit:connection-factory id="connectionFactory" host="${host.name}" port="${port.number}" username="${user.name}" password="${user.password}" connection-timeout="20000"/>
Я читал здесь, что задержка при обработке может быть вызвана проблемой сети, но в этом случае сервер и приложение находятся на одной виртуальной машине.Это заблокированная среда, поэтому большинство портов не открыты, но я сомневаюсь, что это не так.
Больше журналов: https://pastebin.com/4QMFDT7A
Любая помощь приветствуется,
Спасибо,