Как потреблять кафку темы и разбирать / обслуживать по http? - PullRequest
0 голосов
/ 29 мая 2018

Я пытаюсь использовать тему кафки в python и обслуживать через http с помощью клиента prometheus, но я, похоже, заблокирован по теме потребления.Я добавил некоторые заполнители для простого добавления метрик, но похоже, что эта часть блокируется.

import os
from pykafka import KafkaClient
import threading
from kafka import KafkaConsumer
from prometheus_client import start_http_server, Metric, REGISTRY

class CustomCollector(threading.Thread):
    daemon = True

    def collect(self):
        client = KafkaClient(hosts=os.environ['KAFKA_ADDRESS'])
        topic = client.topics[b'os.environ['KAFKA_TOPIC']
        consumer = topic.get_simple_consumer()
        for message in consumer:
            if message is not None:
                print(message.value)

        metric = Metric('test_name', 'description', 'summary')
        metric.add_sample('test_name', 'description', 'summary')
        yield metric

if __name__ == '__main__':
    start_http_server(9998)
    REGISTRY.register(CustomCollector())
    while True: time.sleep(1)

Если я запускаю код, я вижу, что данные темы передаются на консоль, как и ожидалось.Однако моя конечная точка метрики никогда не заполняется, и любой запрос к веб-серверу просто зависает, пока я не убью приложение, на которое оно отвечает стандартными метриками из библиотеки.

1 Ответ

0 голосов
/ 06 июня 2018

Создание экземпляра Metric должно происходить один раз на каждое использованное сообщение.То есть вызов Metric() должен быть внутри цикла for message in consumer.Кроме того, вы, вероятно, хотите каким-то образом использовать message.value при создании экземпляра Metric.

...