Я создал новую топи c и написал функцию для приема сообщений:
consumer = KafkaConsumer(topic, bootstrap_servers=broker_list, auto_offset_reset='earliest', group_id='test', enable_auto_commit=True)
for msg in consumer:
print(msg.value)
time.sleep(1)
Я использую следующую функцию для проверки подтвержденного состояния каждого раздела:
def get_current_offset(topic, group):
consumer = KafkaConsumer(
bootstrap_servers=broker_list,
group_id=group,
enable_auto_commit=False
)
for p in consumer.partitions_for_topic(topic):
tp = TopicPartition(topic, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
if not committed:
committed = 0
print("topic: %s partition: %s committed: %s last: %s lag: %s" % (topic, p, committed, last_offset, (last_offset - committed)))
consumer.close(autocommit=False)
Проблема заключается в том, что всякий раз, когда я получаю сообщение, вывод get_current_offset
выглядит следующим образом:
topic: zhihu_comment partition: 0 committed: 0 last: 29199 lag: 29199
topic: zhihu_comment partition: 1 committed: 0 last: 29089 lag: 29089
topic: zhihu_comment partition: 2 committed: 0 last: 29149 lag: 29149
, что указывает на то, что не было зафиксировано ни одного сообщения. Что мне странно, так это то, что другие топи c, использованные таким же образом, могут фиксировать нормально.
topic: zhihu_profile partition: 0 committed: 699879 last: 699879 lag: 0
topic: zhihu_profile partition: 1 committed: 697061 last: 697061 lag: 0
topic: zhihu_profile partition: 2 committed: 697127 last: 697127 lag: 0
Что-то не так с топи c?
Обновление:
Я пытался использовать kafka-console-consumer --bootstrap-server device1:9092 --topic zhihu_comment --group test --from-beginning
для потребления, и сообщения передаются нормально!
Итак, я подозреваю, что это из-за проблемы с kafka-python
.