Как получить последнее смещение от каждого раздела, используя kafka-python? - PullRequest
0 голосов
/ 24 апреля 2019

Я пытаюсь получить последнее смещение (не зафиксированное смещение) из каждого раздела для данной темы.

from kafka import KafkaConsumer, TopicPartition

topic = 'test-topic'
broker = 'localhost:9092'

consumer = KafkaConsumer(bootstrap_servers=broker)

tp = TopicPartition(topic, 0)        #1
consumer.assign([tp])                #2
consumer.seek_to_end(tp)             #3
last_offset = consumer.position(tp)  #4

for i in consumer.partitions_for_topic(topic):
    tp = TopicPartition(topic, i)
    consumer.assign([tp])
    consumer.seek_to_end(tp)
    last_offset = consumer.position(tp)
    print(last_offset)

Предыдущий код работает и печатает смещение каждого раздела.Тем не менее, обратите внимание, что у меня есть те же 4 строки вне цикла, а также внутри цикла.Если я удаляю любую из строк # 1 - # 4 (4 строки, непосредственно предшествующие циклу for), я получаю сообщение об ошибке: Файл "check_kafka_offset.py", строка 19, для i в consumer.partitions_for_topic (topic): TypeError: объект 'NoneType' не повторяется

Почему мне нужно иметь 4 строки перед циклом for?

1 Ответ

1 голос
/ 24 апреля 2019

Вы можете использовать функцию end_offsets(partitions) в этом клиенте, чтобы получить последнее смещение для указанных разделов. Обратите внимание, что возвращаемым смещением является смещение next , то есть текущее окончание +1. Документация здесь.

Редактировать: Пример реализации:

from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

BOOTSTRAP="""cluster:9092"""
API_KEY="""redacted"""
API_SECRET="""redacted"""
TOPIC="python-test"

consumer = KafkaConsumer(
    group_id="my-group",
    bootstrap_servers=[BOOTSTRAP],
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username=API_KEY,
    sasl_plain_password=API_SECRET,
    value_deserializer=lambda m: json.loads(m.decode('ascii')),
    auto_offset_reset='earliest'
)

PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
    PARTITIONS.append(TopicPartition(TOPIC, partition))

partitions = consumer.end_offsets(PARTITIONS)
print(partitions)

и end_offsets выглядит так:

{TopicPartition(topic=u'python-test', partition=0): 5,
 TopicPartition(topic=u'python-test', partition=1): 20,
 TopicPartition(topic=u'python-test', partition=2): 0}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...