Требовать, чтобы класс Listener для исключений при закрытии канала RabbitMQ происходил в приложениях Spring Stream - PullRequest
0 голосов
/ 19 февраля 2019

Мы используем приложения потока потоков данных весеннего облака и RabbitMQ в качестве посредника сообщений.

В общем потоке потоков от модулей источника к приемнику мы теряем данные, когда бы мы ни увидели «ChannelShutdown: ошибки соединения»в любом из модулей в данном поточном потоке.

Stream Example: Source | Transformer1 | transformer2 | transformer3 | sink

т.е. любой из каналов RabbitMQ подключается, были потеряны, тогда Приложения не смогли передать данные в следующие модули / Приложения, что приводит кпотеря данных.

Исключения:

2019-02-18 15:29:41.364 ERROR 94489 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: connection error; 
2019-02-18 15:29:42.008  INFO 94489 --- [strationQueue-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@6adc5b9c: tags=[{amq.ctag-5dNneAd3QgwWADta7JAmQQ=employeeRegistrations.employeeRegistrationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@a6e4897 Shared Rabbit Connection: SimpleConnection@22dc59b2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50775], acknowledgeMode=NONE local queue size=0
2019-02-18 15:29:42.010  INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2019-02-18 15:29:42.019  INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#d611f1c:1/SimpleConnection@1782b48a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50864]

Чтобы воспроизвести проблему: я запустил две программы Spring Cloud Stream

  1. Производитель - который отправляет 100 000 сообщенийна обмен RabbitMQ
  2. Потребитель - который является модулем приемника, получает полезную нагрузку из этой очереди [связана с обменом] и печатает

Для того, чтобы получить «отключение канала: ошибка соединения» в программе потребителялоги, я зашел на страницу пользовательского интерфейса rabbitMQ и постоянно удалял соединения, доступные на странице пользовательского интерфейса RabbitMQ.

enter image description here

наконец, в ходе этого процесса потребитель получил только 98 484 сообщений из 100 000 сообщений.Таким образом, мы потеряли данные при передаче из-за отключения подключения каналов


Мой вопрос:

Можем ли мы отловить или обнаружить «Отключение канала: ошибка подключения»"в приложениях Spring Stream?

Существует ли какой-либо класс слушателей RabbitMQ, доступный для включения в потоковое приложение для обработки ошибки" Отключение канала: ошибка соединения "?

Я сталкивался с прослушивателями RabbitMQ, такими как

, используя аннотацию @RabbitListener в приложении Stream

Example:
@RabbitListener(queues = TEST_QUEUE)
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

, но этот прослушиватель RabbitMQ прослушивает только указанные очереди или привязку, как указано в определении https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html

Я хотел бы знать, есть ли какой-нибудь общий класс слушателей RabbitMQ, который слушает соединения каналов, а не слушает определенную очередь.

Так что мой вопрос такой: есть ли слушатели, доступные дляпроверить все каналы [связанные с настоящим приложением] произошло отключение или нет, если я могу обработать потерю данных, отправивполезная нагрузка назад к следующему приложению после установления соединения по каналу.

Помогает ли мне SimpleRabbitListenerContainerFactory class в этой ситуации?Если это так, пожалуйста, дайте мне знать подход к решению этой проблемы потери данных из-за отключения канала и проблемы потери соединения.

Пример:

1 Ответ

0 голосов
/ 19 февраля 2019

Вы можете включить повторную попытку в производителе со свойствами Spring Boot. Документация здесь .

Прокрутите вниз до RabbitMQ

...
spring.rabbitmq.template.retry.enabled=false # Whether publishing retries are enabled.
spring.rabbitmq.template.retry.initial-interval=1000ms # Duration between the first and second attempt to deliver a message.
spring.rabbitmq.template.retry.max-attempts=3 # Maximum number of attempts to deliver a message.
spring.rabbitmq.template.retry.max-interval=10000ms # Maximum duration between attempts.
spring.rabbitmq.template.retry.multiplier=1 # Multiplier to apply to the previous retry interval.
...

Но, чтобы ответить на ваш вопрос, вы можете добавить ConnectionListener к определению компонента фабрики соединений.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...