python Кафка не может отправить сообщение - PullRequest
1 голос
/ 09 февраля 2020

Я создал новую топи c и написал функцию для приема сообщений:

consumer = KafkaConsumer(topic, bootstrap_servers=broker_list, auto_offset_reset='earliest', group_id='test', enable_auto_commit=True)
for msg in consumer:
    print(msg.value)
    time.sleep(1)

Я использую следующую функцию для проверки подтвержденного состояния каждого раздела:

def get_current_offset(topic, group):

    consumer = KafkaConsumer(
            bootstrap_servers=broker_list,
            group_id=group,
            enable_auto_commit=False
        )   

    for p in consumer.partitions_for_topic(topic):
        tp = TopicPartition(topic, p)
        consumer.assign([tp])
        committed = consumer.committed(tp)
        consumer.seek_to_end(tp)
        last_offset = consumer.position(tp)

        if not committed:
            committed = 0 
        print("topic: %s partition: %s committed: %s last: %s lag: %s" % (topic, p, committed, last_offset, (last_offset - committed)))

    consumer.close(autocommit=False)

Проблема заключается в том, что всякий раз, когда я получаю сообщение, вывод get_current_offset выглядит следующим образом:

topic: zhihu_comment partition: 0 committed: 0 last: 29199 lag: 29199
topic: zhihu_comment partition: 1 committed: 0 last: 29089 lag: 29089
topic: zhihu_comment partition: 2 committed: 0 last: 29149 lag: 29149

, что указывает на то, что не было зафиксировано ни одного сообщения. Что мне странно, так это то, что другие топи c, использованные таким же образом, могут фиксировать нормально.

topic: zhihu_profile partition: 0 committed: 699879 last: 699879 lag: 0
topic: zhihu_profile partition: 1 committed: 697061 last: 697061 lag: 0
topic: zhihu_profile partition: 2 committed: 697127 last: 697127 lag: 0

Что-то не так с топи c?

Обновление:

Я пытался использовать kafka-console-consumer --bootstrap-server device1:9092 --topic zhihu_comment --group test --from-beginning для потребления, и сообщения передаются нормально!

Итак, я подозреваю, что это из-за проблемы с kafka-python.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...