Вы можете использовать функцию end_offsets(partitions)
в этом клиенте, чтобы получить последнее смещение для указанных разделов. Обратите внимание, что возвращаемым смещением является смещение next , то есть текущее окончание +1. Документация здесь.
Редактировать: Пример реализации:
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
BOOTSTRAP="""cluster:9092"""
API_KEY="""redacted"""
API_SECRET="""redacted"""
TOPIC="python-test"
consumer = KafkaConsumer(
group_id="my-group",
bootstrap_servers=[BOOTSTRAP],
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username=API_KEY,
sasl_plain_password=API_SECRET,
value_deserializer=lambda m: json.loads(m.decode('ascii')),
auto_offset_reset='earliest'
)
PARTITIONS = []
for partition in consumer.partitions_for_topic(TOPIC):
PARTITIONS.append(TopicPartition(TOPIC, partition))
partitions = consumer.end_offsets(PARTITIONS)
print(partitions)
и end_offsets
выглядит так:
{TopicPartition(topic=u'python-test', partition=0): 5,
TopicPartition(topic=u'python-test', partition=1): 20,
TopicPartition(topic=u'python-test', partition=2): 0}