Python kafka: есть ли способ заблокировать потребителя по теме kafka, пока не будет опубликовано новое сообщение? - PullRequest
0 голосов
/ 10 сентября 2018

У меня есть потребитель, подписавшийся на тестовую тему, где ветка продюсера регулярно публикует сообщения.Я хотел бы иметь возможность блокировать потребительский поток до тех пор, пока не появится новое сообщение, затем обработать его и снова начать ждать.Самое близкое, что я получил, это:

consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                         bootstrap_servers=[localhost_],
                         api_version=(0, 10), consumer_timeout_ms=1000)
while True:
    print(consumer.poll(timeout_ms=5000))

Есть ли более идиоматический способ (или есть какие-то серьезные проблемы с этим способом, которого я не вижу)?советы по этой конструкции очень приветствуются.Полный (работающий) пример:

import time
from threading import Thread

import kafka
from kafka import KafkaProducer, KafkaConsumer

print('python-kafka:', kafka.__version__)

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(str(key), encoding='utf-8')
        value_bytes = bytes(str(value), encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
    except Exception as ex:
        print('Exception in publishing message\n', ex)

localhost_ = 'localhost:9092'

def kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[localhost_],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    j = 0
    while True:
        publish_message(_producer, topic_name, value=j, key=j)
        j += 1
        time.sleep(5)

if __name__ == '__main__':
    print('Running Producer..')
    topic_name = 'test'
    prod_thread = Thread(target=kafka_producer)
    prod_thread.start()
    consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                             bootstrap_servers=[localhost_],
                             api_version=(0, 10), consumer_timeout_ms=1000)
    # consumer.subscribe([topic_name])
    while True:
        print(consumer.poll(timeout_ms=5000))

python-kafka: 1.3.5

1 Ответ

0 голосов
/ 10 сентября 2018

Опрос в бесконечном цикле - это то, что предлагается в Кафка: полное руководство .Вот выдержка из Java Глава 4. Потребители Kafka: чтение данных из Kafka с использованием той же идеи:

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        ...
    }
}

Это хорошо отражает то, как библиотеки рекомендуется использовать в Python.

kafka-python (см. Полный пример в Повесть о двух клиентах Kafka )

from kafka import KafkaConsumer
...
kafka_consumer = Consumer(
...
)
consumer.subscribe([topic])

running = True
while running:
    message = kafka_consumer.poll()
...

confluent-kafka-python (см. полный пример в Введение в Apache Kafka для программистов на Python )

from confluent_kafka import Consumer, KafkaError
...
c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    while True:
        msg = c.poll(0.1)
...

Еще один тесно связанный вопрос, который может возникнуть, заключается в том, как обрабатывать сообщения.

Эта часть вашего кода может зависеть от внешних зависимостей (базы данных, удаленные службы, сетевые файловые системы и т. Д.), Что может привести к неудачным попыткам обработки.

Так что может быть хорошей идеей реализовать логику повторения, вы можете найти хорошее описание того, как это будет выглядеть в посте блога Повторение пользовательской архитектуры в Apache Kafka .

...