В клиенте Kafka python я пытаюсь выполнить фиксацию вручную, но фиксация работает неправильно. Цель состоит в том, чтобы получить сообщения, а затем зафиксировать их только после некоторой обработки. Здесь я пытаюсь получить пакет из 5 сообщений одновременно и зафиксировать их одно за другим. Но потребитель фиксирует все сообщения, которые были отправлены в приложение потребителя от Kafka.
from kafka import KafkaConsumer
import kafka
consumer = KafkaConsumer(
bootstrap_servers='localhost:9092',
auto_offset_reset='latest',
group_id='3',
enable_auto_commit=True,
session_timeout_ms=10000)
consumer.subscribe(['mytopic'])
# simplest of pulls:
for msg in consumer:
print(msg)
Если я запусту это, потребитель выйдет, как только не будет входящих сообщений, и истечет время ожидания.
Может ли этот потребитель быть основан на событиях? Как в - он работает непрерывно и передается каждый раз, когда поступают новые сообщения, но не прерывается?