Мой сервер работает на Flask, у которого есть некоторые конечные точки API.Используя эти конечные точки API, я сохраняю экземпляр объекта, назовем его приложением.Поэтому я делаю операцию CRUD на модели приложения через API.Мне нужно получить данные с сервера Кафка и вставить в InfxDB (база данных временных рядов).На сервере kafka есть несколько тем, с которых я должен получать данные.Если вы увидите поведение Python KafkaConsumer, лучший способ - это длительный опрос, так что, как только данные поступят в какую-либо тему, они обработают этот пакет данных и сохранят в db.Основная проблема - я хочу динамически добавлять темы в kafkaConsumer.Например: если «topic_one» и «topic_two» уже подписаны через kafkaConsumer и если еще одно приложение было сохранено через API, я хочу добавить новую тему kafka в kafkaConsumer.
Так как мне это сделать??
Еще одна вещь: если я остановлю свой сервер и перезапущу его, чем для существующего приложения, он должен создать kafkaConsumer и начать слушать темы kafka.
Если я запускаю kafkaConsumer через потоки, что-тонапример:
import requests
import threading
import time
from flask import Flask
app = Flask(__name__)
def run_job(a=1):
while True:
print("Run recurring task %s"%a)
time.sleep(3)
@app.before_first_request
def activate_job():
run_job()
thread = threading.Thread(target=run_job)
thread.start()
if __name__ == "__main__":
app.run(threaded=True)
run_job будет иметь реализацию kafkaConsumer, чем я не могу остановить существующий поток и обновить kafkaConsumer новыми темами?
К вашему сведению, у нас также может быть несколько потребителейодин потребитель будет слушать только одну тему кафки, но что, если количество тем увеличится, будут ли какие-либо проблемы?