Разъем Camel RabbitMQ читает тысячи сообщений перед их использованием - PullRequest
0 голосов
/ 17 октября 2019

В моем приложении мы используем маршрут Camel для чтения сообщений из очереди RabbitMQ. Конфигурация выглядит следующим образом:

from("rabbitmq:myexchange?routingKey=mykey&queue=q")

Производитель может отправить 50 тыс. Сообщений в течение нескольких минут, и каждое сообщение может занять 1 секунду или более.

Что я вижу, так эточто ВСЕ сообщения потребляются очень быстро, но обработка этих сообщений может занять много часов. Ожидается много часов обработки, но означает ли это, что 50k-сообщения хранятся в памяти? Если это так, я хотел бы отключить это поведение, потому что я не хочу терять сообщения, когда процесс останавливается ... На самом деле, мы теряем большинство сообщений, даже когда процесс продолжает работать, что еще хуже. Похоже, что соединитель не предназначен для одновременной обработки большого количества сообщений, но я не могу сказать, является ли это из-за самого соединителя или потому, что мы не настроили его должным образом.

Я попытался с опцией autoAck:

from("rabbitmq:myexchange?routingKey=mykey&queue=q&autoAck=false")

Таким образом, сообщения откатываются, когда что-то идет не так, но одновременное сохранение неподтвержденных сообщений 50k не кажется хорошей идеей ...

1 Ответ

2 голосов
/ 18 октября 2019

Есть несколько вещей, которыми я хотел бы поделиться.

  1. AutoAck - Да, если вы хотите обработать сообщение (после его получения), вам следует установить AutoAck наЛожь и явное подтверждение сообщения после его обработки.

  2. Настройка Consumer PreFetch - Вам необходимо точно настроить размер предварительной выборки, размер предварительной выборки - это максимальное числосообщений, которые RabbitMQ будет представлять потребителю на ходу, т. е. самое большее общее количество неподтвержденных сообщений будет равно размеру предварительной выборки. В зависимости от вашей системы, если каждое сообщение является критическим, вы можете установить размер предварительной выборки равным 1, если у вас есть многопоточная модель обработки сообщений, вы можете установить размер предварительной выборки, чтобы соответствовать количеству потоков, где каждый поток обрабатывает одно сообщение и аналогично.

В некотором смысле это архитектурно действует как буфер. Если во время обработки этого сообщения ваш процесс останавливается, любое сообщение, которое не было подтверждено до его остановки, все равно будет находиться в очереди, и потребитель получит его снова для обработки.

enter image description here

...