Python Потребитель Kafka, читающий уже прочитанные сообщения - PullRequest
0 голосов
/ 20 июня 2020

Код потребителя Kafka -

def test():
TOPIC = "file_data"
producer = KafkaProducer()
producer.send(TOPIC, "data")
consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest',
    consumer_timeout_ms=1000,
    group_id="Group2",
    enable_auto_commit=False,
    auto_commit_interval_ms=1000
)
topic_partition = TopicPartition(TOPIC, 0)
assigned_topic = [topic_partition]
consumer.assign(assigned_topic)
consumer.seek_to_beginning(topic_partition)
for message in consumer:
    print("%s key=%s value=%s" % (message.topic, message.key, message.value))
consumer.commit()

Ожидаемое поведение Он должен читать только последнее сообщение, написанное производителем. Он должен просто напечатать:

file_data key=None value=b'data'

Текущее поведение После запуска кода он напечатает:

file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'
file_data key=None value=b'data'

Ответы [ 2 ]

1 голос
/ 20 июня 2020
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka import KafkaProducer

def test():
    TOPIC = "file_data"
    producer = KafkaProducer()
    producer.send(TOPIC, b'data')
    consumer = KafkaConsumer(
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='latest',
        consumer_timeout_ms=1000,
        group_id="Group2",
        enable_auto_commit=False,
        auto_commit_interval_ms=1000
    )
    topic_partition = TopicPartition(TOPIC, 0)
    assigned_topic = [topic_partition]
    consumer.assign(assigned_topic)
    # consumer.seek_to_beginning(topic_partition)
    for message in consumer:
        print("%s key=%s value=%s" % (message.topic, message.key, message.value))
    consumer.commit()
test()

Это работает согласно вашим ожиданиям. Если вы хотите, чтобы он начинался с начала, то вызывайте только seekToBeginning

Ref: seek_to_beginning

0 голосов
/ 20 июня 2020

Вместо consumer.seek_to_beginning(topic_partition) нужно использовать consumer.seek_to_end(topic_partition).

Из документов:

Kafka позволяет указывать позицию с помощью seek (TopicPartition, long), чтобы указать новую позицию. Также доступны специальные методы поиска самого раннего и последнего смещения, поддерживаемого сервером (seekToBeginning (Collection) и seekToEnd (Collection) соответственно).

В простой форме:

consumer = KafkaConsumer('topicName', bootstrap_servers=[server], group_id=group_id, enable_auto_commit=True)
consumer.poll()
consumer.seek_to_end()

for message in consumer:
    print(message)

consumer.close()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...