Python Кафка потребителя с офсетным управлением - PullRequest
2 голосов
/ 08 апреля 2020

Я новичок ie для Кафки и пытаюсь настроить потребителя в Кафке так, чтобы он читал сообщения, опубликованные Кафкой. Поправьте меня, если я ошибаюсь, как я понял, если потребительские магазины Kafka компенсируются в ZooKeeper? Тем не менее, у меня не запущен экземпляр zookeeper, и я хочу опрашивать, скажем, каждые 5 минут, чтобы увидеть, публикуются ли какие-либо новые сообщения.

Пока у меня есть код:

import logging
from django.conf import settings
import kafka
import sys
import json

bootstrap_servers = ['localhost:8080']
topicName = 'test-info'
consumer = kafka.KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = 
bootstrap_servers,
auto_offset_reset = 'earliest')

count = 0
#print(consumer.topic)
try:
    for message in consumer:
        #print(type(message.value))
        print("\n")
        print("<>"*20)
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key, message.value))
        print("--"*20)
        info = json.loads(message.value)

        if info['event'] == "new_record" and info['data']['userId'] == "user1" and info['data']['details']['userTeam'] == "foo":
           count = count + 1
           print(count, info['data']['details']['team'], info['data']['details']['leadername'],info['data']['details']['category'])
        else:
            print("Skipping")

    print(count)


except KeyboardInterrupt:
    sys.exit()

Как я могу сохранить смещение таким образом, чтобы в следующий раз, когда оно опрашивает, оно считывало добавочные данные? Любые указатели помогут.

1 Ответ

3 голосов
/ 08 апреля 2020
  1. Это правда, что потребительские магазины Kafka компенсируют ZooKeeper. Так как у вас не установлен Zookeeper. Kafka, вероятно, использует свой встроенный zookeeper.

  2. , в вашем случае вам больше ничего не нужно делать, так как вы уже установили group_id, group_id = 'test'. следовательно, потребитель продолжит автоматически использовать данные из последнего смещения для указанной группы c. потому что он зафиксировал последнее смещение в zookeeper автоматически (auto_commit имеет значение True по умолчанию). Для получения дополнительной информации вы можете проверить здесь

  3. , если вы хотите проверять каждые 5 минут, чтобы увидеть, есть ли какие-либо новые сообщения, опубликованные, вы можете добавить time.sleep(300) в ваш потребитель для l oop.

...