Что я хочу сделать
У меня есть служба HTTP API, написанная на Flask, которая является шаблоном, используемым для создания экземпляров различных служб. Таким образом, этот шаблон должен быть обобщаемым для обработки сценариев использования, которые включают и не включают потребление Kafka.
Моя цель - иметь необязательный потребитель Kafka, работающий в фоновом режиме шаблона API. Я хочу, чтобы любая служба, которая нуждается в ней, могла асинхронно считывать данные из темы Kafka, а также независимо отвечать на запросы HTTP, как это обычно происходит. Эти два процесса (использование Kafka, обработка HTTP-запросов) не связаны, за исключением того, что они будут происходить под капотом одного и того же сервиса.
То, что я написал
Вот мойsetup:
# ./create_app.py
from flask_socketio import SocketIO
socketio = None
def create_app(kafka_consumer_too=False):
"""
Return a Flask app object, with or without a Kafka-ready SocketIO object as well
"""
app = Flask('my_service')
app.register_blueprint(special_http_handling_blueprint)
if kafka_consumer_too:
global socketio
socketio = SocketIO(app=app, message_queue='kafka://localhost:9092', channel='some_topic')
from .blueprints import kafka_consumption_blueprint
app.register_blueprint(kafka_consumption_blueprint)
return app, socketio
return app
Мой run.py такой:
# ./run.py
from . import create_app
app, socketio = create_app(kafka_consumer_too=True)
if __name__=="__main__":
socketio.run(app, debug=True)
А вот схема потребления Kafka, которую я написал, и вот где я думаю это должнообрабатывать события потока:
# ./blueprints/kafka_consumption_blueprint.py
from ..create_app import socketio
kafka_consumption_blueprint = Blueprint('kafka_consumption', __name__)
@socketio.on('message')
def handle_message(message):
print('received message: ' + message)
Что он в данный момент делает
С учетом вышеизложенного мои HTTP-запросы обрабатываются нормально, когда я сверну localhost:5000
. Проблема в том, что когда я пишу в тему some_topic
Kafka (на порту 9092), ничего не появляется. У меня есть клиент CLI Kafka, работающий в другой оболочке, и я вижу, что сообщения, которые я отправляю по этой теме , отображаются. Так что это приложение Flask не реагирует: сообщения не принимаются handle_message()
.
Что мне здесь не хватает? Заранее спасибо.