Каков наилучший способ опроса через равные промежутки времени потребителя Kafka при использовании kafka- python? - PullRequest
1 голос
/ 11 февраля 2020

У меня есть несколько производителей, которые подают данные в Кафку. Я sh хочу запускать Потребителя каждый час, чтобы сразу собрать все накопленные данные и обработать их.


Возможные варианты:

  • Используйте поток python и используйте эквивалент setInterval для вызова Consumer
  • Установка переменной max_poll_interval_ms : (как упоминалось в нескольких других ответах). Тем не менее, официальный документ гласит:

Это устанавливает верхнюю границу количества времени, в течение которого потребитель может бездействовать до получения большего количества записей. Если poll () не вызывается до истечения этого тайм-аута, то считается, что потребитель потерпел неудачу, и группа восстановит баланс. Это не похоже на то, что он отвечает за то, что усыпляет потребителя, а затем запускает его снова.

  • Вместо того, чтобы опрашивать каждый час, я отслеживаю смещение потребителя и опрашиваю после добавления 10 000 записей к Kafka

Однако я хочу управлять этим в самом потребителе. Каков же лучший способ?

Ответы [ 2 ]

1 голос
/ 11 февраля 2020

Используйте Cron или планировщик вашей ОС для вызова скрипта каждый час.

Если вам нужно подождать, пока 10k записей не появятся в topi c, чтобы сделать что-нибудь полезное, то я не совсем уверен, Кафка хорошо вписывается в эту архитектуру. Кроме того, потребительское отставание практически постоянно будет отставать

0 голосов
/ 12 февраля 2020

Если вы читаете официальный документ для max_poll_interval_ms, это максимальный интервал, в течение которого потребитель может бездействовать. После этого потребитель считается мертвым, и происходит перебалансировка группы потребителей.

Вот почему я бы посоветовал вам не закрывать потребителя после 10k. Хотя использование смещения для опроса является хорошей стратегией, с этим есть проблема. Каждое новое потребительское смещение не означает, что это новое сообщение. В зависимости от вашей конфигурации auto.offset.reset у вас могут быть повторяющиеся сообщения.

Чтобы сэкономить на запуске модуля pod, я бы посоветовал вам создать topi c с меньшим количеством разделов. Это может сэкономить на передаче данных + стоимость хранения. Хотя экземпляр должен быть запущен.

...