Как получить ConsumerRecord из KafkaConsumer.poll () в Python - PullRequest
0 голосов
/ 30 декабря 2018

Я использовал kafka-python для обработки сообщений в кластере kafka:

consumer = KafkaConsumer ('session', auto_offset_reset = 'Самое раннее']

, в то время как True:

   dict = consumer.poll(500)

   for d in dict:

     print d.topic, d.partition, d.value

Это даст ошибку «AttributeError: объект TopicPartition» не имеет атрибута «значение».

«dict» похож на это (из «print dict»)

{TopicPartition(topic=u'session', partition=0): [ConsumerRecord(topic=u'session', partition=0, offset=56, timestamp=None, timestamp_type=None, key=None, value='0000000000000000', headers=[], checksum=2855809697, serialized_key_size=-1, serialized_value_size=16, serialized_header_size=-1)]}

В каждом разделе может быть много разделов и сотни ConsumerRecord. Как правильно получить доступ к этим ConsumerRecord из consumer.poll ()? Заранее спасибо.

1 Ответ

0 голосов
/ 29 января 2019

у вас ошибка при использовании dict;По умолчанию «for d in dict:» означает «for d in dict.keys ():», поэтому вы можете только получить ключи этого dict.Попробуйте это:

dict = consumer.poll(500)
for key, value in dict.items():
    print(key)
    print()
    for record in value[:10]:
        print(record)
        print()

Это может исправить вашу ошибку.

...