Как создать событие на основе Kafka Consumer в Python - PullRequest
0 голосов
/ 20 января 2020

В клиенте Kafka python я пытаюсь выполнить фиксацию вручную, но фиксация работает неправильно. Цель состоит в том, чтобы получить сообщения, а затем зафиксировать их только после некоторой обработки. Здесь я пытаюсь получить пакет из 5 сообщений одновременно и зафиксировать их одно за другим. Но потребитель фиксирует все сообщения, которые были отправлены в приложение потребителя от Kafka.

from kafka import KafkaConsumer
import kafka

consumer = KafkaConsumer(
    bootstrap_servers='localhost:9092',
    auto_offset_reset='latest',
    group_id='3',
    enable_auto_commit=True,
    session_timeout_ms=10000)

consumer.subscribe(['mytopic'])

# simplest of pulls:
for msg in consumer:
    print(msg)

Если я запусту это, потребитель выйдет, как только не будет входящих сообщений, и истечет время ожидания.

Может ли этот потребитель быть основан на событиях? Как в - он работает непрерывно и передается каждый раз, когда поступают новые сообщения, но не прерывается?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...