Существует ли Python API для потребителя событий, управляемого Кафкой? - PullRequest
0 голосов
/ 12 ноября 2018

Я пытался создать приложение Flask, в котором единственным интерфейсом был Kafka. По этой причине я хочу иметь получателя Kafka, который запускается, когда в потоке соответствующей темы появляется новое сообщение, и отвечать, отправляя сообщения обратно в поток Kafka.

Я искал что-то вроде реализации Spring:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
    System.out.println("Received Messasge in group mygroup: " + message);
}

Я смотрел на:

  1. Кафка-питон
  2. pykafka
  3. сливающийся-Кафка

Но я не смог найти ничего связанного с управляемым событиями стилем реализации в Python.

Ответы [ 2 ]

0 голосов
/ 14 ноября 2018

Вот реализация идеи, данной @ MickaelMaison's answer . Я использовал kafka-python .

from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
    def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
            print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
            kafka_listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

def kafka_listener(data):
    print("Image Ratings:\n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)

Опрос выполняется в другой ветке. После получения сообщения слушатель вызывается, передавая данные, полученные из Kafka.

0 голосов
/ 12 ноября 2018

Kafka Consumer должен непрерывно опрашивать, чтобы получить данные от брокеров.

Spring предоставляет вам этот модный API, но под прикрытием он вызывает опрос в цикле и вызывает ваш метод только после извлечения записей.

Вы можете легко создать нечто подобное с любым из упомянутых вами клиентов Python.Как и в Java, это не API, напрямую предоставляемый (большинством) клиентов Kafka, а нечто, предоставляемое слоем сверху.Это то, что вам нужно построить.

...