Kafka Python Потребитель, работающий с auto_offset_reset = 'early', но не auto_offset_reset = 'latest' - PullRequest
1 голос
/ 06 мая 2020

Я прочитал много статей и официальных документов Kafka, но не смог понять, в чем проблема.

У меня есть потребительский код Kafka:

response_consumer = KafkaConsumer(<topic_name>, bootstrap_servers=<server_list>,
                            consumer_timeout_ms = 15000, auto_offset_reset='earliest')
result = []
for message in response_consumer :
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))
    result.append(message.value)
response_consumer.close()

Приведенный выше код работает с auto_offset_reset = 'самый ранний', но не auto_offset_reset = 'latest'. Под неработающим я подразумеваю, что я устанавливаю точку останова на l oop и отправляю сообщение, используя производителя:

  1. С auto_offset_reset = 'early' я получаю все сообщения вместе с большинством последнее сообщение в результате
  2. С auto_offset_reset = 'latest' я не получаю никакого сообщения в результате

Прочитал эту ветку, но не решил проблему: kafka- python потребитель не получает сообщения (используется group_id, не помогает)

Любая помощь приветствуется, спасибо.

Обновление: приведенный ниже код работает нормально (результат не читает все сообщения от начиная с auto_offset_reset = 'latest' и result1 содержит только последнее созданное сообщение):

    response_consumer = KafkaConsumer(<topic_name>, bootstrap_servers=<server_list>, consumer_timeout_ms = 15000, auto_offset_reset='latest')
    result = []
    for message in response_consumer :
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key,
                                             message.value))
        result.append(message.value)

//Send a new message via producer
    result1 = []
    for message in response_consumer :
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                             message.offset, message.key,
                                             message.value))
        result1.append(message.value)
    response_consumer.close()
...