KafkaConsumer poll () понимание поведения - PullRequest
1 голос
/ 30 сентября 2019

Попытка понять (новичок в kafka), как работает цикл событий опроса в kafka.

Вариант использования: 25 записей по теме, максимальный размер опроса установлен на 5.

max.poll.interval.ms = 5000 //5 seconds by default max.poll.records = 5

Последовательность задач

  1. Опрос записей из темы.
  2. Обработка записей в цикле for.
  3. Некоторая обработка входа в системугде логика либо прошла бы, либо потерпела неудачу.
  4. Если логические проходы (со смещением) будут добавлены на карту.
  5. Тогда она будет зафиксирована с использованием вызова commitSync.
  6. Если произойдет сбой, цикл будет прерван, и какой бы ни был успех до того, как это будет зафиксировано. После этого начнется проблема.
  7. Следующий опрос просто продолжит работать партиями по 5 дажеожидается ли это после ошибки?

Что мы в основном ожидаем, так это то, что цикл прерывается и смещается до тех пор, пока логика сообщения процесса успеха не будет зафиксирована, тогда следующий опрос должен продолжиться с сообщения об ошибке.

Пример, 1-я партияопрашивать 5 опрошенных сообщений и 1,2 смещения успешно и зафиксировано, а затем 3-й сбой. Так что вызов опроса продолжает перемещаться к следующему пакету, например 5-10,10-15, если между ними возникнут какие-либо ошибки, мы ожидаем, что он остановится в этой точке и произойдет опросдолжен начинаться с 3 в первом случае, или если во 2-м пакете происходит сбой в 8, то следующий опрос должен начинаться с 8-го смещения, а не от настроек следующего максимального пакета опроса, которые в этом случае были бы равны 5. autocommit имеет значение false.

Я попытался найти это в документации, но без помощи.

попытался настроить это, но без помощи max.poll.interval.ms

1 Ответ

0 голосов
/ 30 сентября 2019

max.poll.interval.ms - это миллисекунды, а не секунды, поэтому должно быть 5000.

Как только записи были возвращены опросом (и смещения не зафиксированы), они не будут возвращены снова, если вы не перезапуститепотребителя или выполните seek() операции над потребителем, чтобы сбросить смещение на необработанные.

Проект Spring for Apache Kafka предоставляет SeekToCurrentErrorHandler для выполнения этой задачи.

Есливы сами пользуетесь потребителем (как это звучит), вы должны искать его.

...