Мы используем приложения потока потоков данных весеннего облака и RabbitMQ в качестве посредника сообщений.
В общем потоке потоков от модулей источника к приемнику мы теряем данные, когда бы мы ни увидели «ChannelShutdown: ошибки соединения»в любом из модулей в данном поточном потоке.
Stream Example: Source | Transformer1 | transformer2 | transformer3 | sink
т.е. любой из каналов RabbitMQ подключается, были потеряны, тогда Приложения не смогли передать данные в следующие модули / Приложения, что приводит кпотеря данных.
Исключения:
2019-02-18 15:29:41.364 ERROR 94489 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error;
2019-02-18 15:29:42.008 INFO 94489 --- [strationQueue-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@6adc5b9c: tags=[{amq.ctag-5dNneAd3QgwWADta7JAmQQ=employeeRegistrations.employeeRegistrationQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@a6e4897 Shared Rabbit Connection: SimpleConnection@22dc59b2 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50775], acknowledgeMode=NONE local queue size=0
2019-02-18 15:29:42.010 INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2019-02-18 15:29:42.019 INFO 94489 --- [strationQueue-2] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#d611f1c:1/SimpleConnection@1782b48a [delegate=amqp://guest@127.0.0.1:5672/, localPort= 50864]
Чтобы воспроизвести проблему: я запустил две программы Spring Cloud Stream
- Производитель - который отправляет 100 000 сообщенийна обмен RabbitMQ
- Потребитель - который является модулем приемника, получает полезную нагрузку из этой очереди [связана с обменом] и печатает
Для того, чтобы получить «отключение канала: ошибка соединения» в программе потребителялоги, я зашел на страницу пользовательского интерфейса rabbitMQ и постоянно удалял соединения, доступные на странице пользовательского интерфейса RabbitMQ.
наконец, в ходе этого процесса потребитель получил только 98 484 сообщений из 100 000 сообщений.Таким образом, мы потеряли данные при передаче из-за отключения подключения каналов
Мой вопрос:
Можем ли мы отловить или обнаружить «Отключение канала: ошибка подключения»"в приложениях Spring Stream?
Существует ли какой-либо класс слушателей RabbitMQ, доступный для включения в потоковое приложение для обработки ошибки" Отключение канала: ошибка соединения "?
Я сталкивался с прослушивателями RabbitMQ, такими как
, используя аннотацию @RabbitListener в приложении Stream
Example:
@RabbitListener(queues = TEST_QUEUE)
public void handle(Foo in) {
logger.info("Received: " + in);
}
, но этот прослушиватель RabbitMQ прослушивает только указанные очереди или привязку, как указано в определении https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html
Я хотел бы знать, есть ли какой-нибудь общий класс слушателей RabbitMQ, который слушает соединения каналов, а не слушает определенную очередь.
Так что мой вопрос такой: есть ли слушатели, доступные дляпроверить все каналы [связанные с настоящим приложением] произошло отключение или нет, если я могу обработать потерю данных, отправивполезная нагрузка назад к следующему приложению после установления соединения по каналу.
Помогает ли мне SimpleRabbitListenerContainerFactory class в этой ситуации?Если это так, пожалуйста, дайте мне знать подход к решению этой проблемы потери данных из-за отключения канала и проблемы потери соединения.
Пример: