Kafka сообщение потеря из-за более позднего сообщения - PullRequest
0 голосов
/ 17 мая 2019

Итак, я получил несколько досадных компромиссов с моими потребителями кафки.Я использую 'kafka-node' для своего проекта.Я создал тему.Создано 2 потребителя в группе потребителей на 2 серверах.Для автоматической фиксации установлено значение false.Для каждого сообщения, получаемого моими потребителями, они запускают асинхронный процесс, который может занять от 1 до 20 секунд, когда процесс завершен, потребитель фиксирует смещение. Моя проблема: существует сценарий, в котором Потребитель 1 получает сообщение и занимает 20 секунд.обрабатывать.В середине процесса он получает другое сообщение, для обработки которого требуется 1 с.Он заканчивает обработку второго сообщения, фиксирует смещение, а затем сразу падает.Причинение сбоя предыдущей обработки сообщения.Если я запускаю потребителя, он не читает первое сообщение снова, потому что второе сообщение уже передало offsst, который больше первого.Как я могу избежать этого?

Kafkaconsumer.on('message', async(message)=>{
await SOMETHING_ASYNC_1~20SEC;
Kafkaconsumer.commit(()=>{});
});

1 Ответ

0 голосов
/ 17 мая 2019

В сущности, вы хотите ограничивать сообщения и обрабатывать параллелизм, используя async.queue.

  1. Создайте async.queue с процессором сообщений и параллелизмом одного (сам процессор сообщений обернут в setImmediate, чтобы он не заморозил цикл обработки событий)
  2. Установитеqueue.drain для возобновления работы с потребителем
  3. Обработчик для события message для потребителя приостанавливает работу потребителя и отправляет сообщение в очередь.

README-узел kafka подробнее это здесь .

Пример реализации, аналогичный вашей проблеме, можно найти здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...