Опрос клиента Kafka () выдает EOFError после каждого полученного сообщения - PullRequest
0 голосов
/ 06 марта 2019

Я использую небольшой вариант примера кода опроса клиента, который я нашел в Confluent github :

c = Consumer({'bootstrap.servers':'localhost:9092','group.id':'devops','auto.offset.reset':'earliest'})
c.subscribe(['system-diskio-write-bytes','system-cpu-user-pct'])

try:
    while True:
        msg = c.poll(timeout=1000.0)
        if msg is None:
            continue
        if msg.error():
            print(msg.error())
        else:
            print('topic: %s key: %s value: %s' % (msg.topic(), msg.key(), msg.value()))

except KeyboardInterrupt:
    print('Polling interrupted by consumer')

EOF KafkaError вызывается после каждого полученного сообщения:

topic: system-diskio-write-bytes key: None value: b'{"route" : "system-diskio-write-bytes", "timestamp" : 2019-03-06T13:46:25.244, "value" : 655002980352.0}'
KafkaError{code=_PARTITION_EOF,val=-191,str="Broker: No more messages"}

Я не понимаю, почему это происходит - есть идеи, почему эта ошибка выдается, как исправить?Любые идеи очень ценятся - спасибо!

1 Ответ

0 голосов
/ 06 марта 2019

Хорошо, подробный ответ здесь .Объяснение выглядит следующим образом:

Событие EOF будет помещаться во внутреннюю очередь сообщений (обслуживаемую poll ()) каждый раз, когда потребитель достигает нового конечного смещения, независимо от того, не вызывает ли ваше приложение вызовpoll () в течение 10 секунд.

Событие EOF можно отключить следующим образом:

enable.partition.eof=false
...