Я прочитал много статей и официальных документов Kafka, но не смог понять, в чем проблема.
У меня есть потребительский код Kafka:
response_consumer = KafkaConsumer(<topic_name>, bootstrap_servers=<server_list>,
consumer_timeout_ms = 15000, auto_offset_reset='earliest')
result = []
for message in response_consumer :
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
result.append(message.value)
response_consumer.close()
Приведенный выше код работает с auto_offset_reset = 'самый ранний', но не auto_offset_reset = 'latest'. Под неработающим я подразумеваю, что я устанавливаю точку останова на l oop и отправляю сообщение, используя производителя:
- С auto_offset_reset = 'early' я получаю все сообщения вместе с большинством последнее сообщение в результате
- С auto_offset_reset = 'latest' я не получаю никакого сообщения в результате
Прочитал эту ветку, но не решил проблему: kafka- python потребитель не получает сообщения (используется group_id, не помогает)
Любая помощь приветствуется, спасибо.
Обновление: приведенный ниже код работает нормально (результат не читает все сообщения от начиная с auto_offset_reset = 'latest' и result1 содержит только последнее созданное сообщение):
response_consumer = KafkaConsumer(<topic_name>, bootstrap_servers=<server_list>, consumer_timeout_ms = 15000, auto_offset_reset='latest')
result = []
for message in response_consumer :
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
result.append(message.value)
//Send a new message via producer
result1 = []
for message in response_consumer :
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
result1.append(message.value)
response_consumer.close()