KafkaConsumer с приложением Flask должен запускаться как задача демона - PullRequest
0 голосов
/ 21 февраля 2019

Мой сервер работает на Flask, у которого есть некоторые конечные точки API.Используя эти конечные точки API, я сохраняю экземпляр объекта, назовем его приложением.Поэтому я делаю операцию CRUD на модели приложения через API.Мне нужно получить данные с сервера Кафка и вставить в InfxDB (база данных временных рядов).На сервере kafka есть несколько тем, с которых я должен получать данные.Если вы увидите поведение Python KafkaConsumer, лучший способ - это длительный опрос, так что, как только данные поступят в какую-либо тему, они обработают этот пакет данных и сохранят в db.Основная проблема - я хочу динамически добавлять темы в kafkaConsumer.Например: если «topic_one» и «topic_two» уже подписаны через kafkaConsumer и если еще одно приложение было сохранено через API, я хочу добавить новую тему kafka в kafkaConsumer.

Так как мне это сделать??

Еще одна вещь: если я остановлю свой сервер и перезапущу его, чем для существующего приложения, он должен создать kafkaConsumer и начать слушать темы kafka.

Если я запускаю kafkaConsumer через потоки, что-тонапример:

import requests
import threading
import time
from flask import Flask
app = Flask(__name__)

def run_job(a=1):
    while True:
        print("Run recurring task %s"%a)
        time.sleep(3)

@app.before_first_request
def activate_job():
    run_job()
    thread = threading.Thread(target=run_job)
    thread.start()

if __name__ == "__main__":
    app.run(threaded=True)

run_job будет иметь реализацию kafkaConsumer, чем я не могу остановить существующий поток и обновить kafkaConsumer новыми темами?

К вашему сведению, у нас также может быть несколько потребителейодин потребитель будет слушать только одну тему кафки, но что, если количество тем увеличится, будут ли какие-либо проблемы?

...