Flask-socketIO + Kafka как фоновый процесс - PullRequest
0 голосов
/ 23 октября 2019

Что я хочу сделать

У меня есть служба HTTP API, написанная на Flask, которая является шаблоном, используемым для создания экземпляров различных служб. Таким образом, этот шаблон должен быть обобщаемым для обработки сценариев использования, которые включают и не включают потребление Kafka.

Моя цель - иметь необязательный потребитель Kafka, работающий в фоновом режиме шаблона API. Я хочу, чтобы любая служба, которая нуждается в ней, могла асинхронно считывать данные из темы Kafka, а также независимо отвечать на запросы HTTP, как это обычно происходит. Эти два процесса (использование Kafka, обработка HTTP-запросов) не связаны, за исключением того, что они будут происходить под капотом одного и того же сервиса.

То, что я написал

Вот мойsetup:

# ./create_app.py

from flask_socketio import SocketIO

socketio = None

def create_app(kafka_consumer_too=False):
   """
   Return a Flask app object, with or without a Kafka-ready SocketIO object as well
   """
   app = Flask('my_service')
   app.register_blueprint(special_http_handling_blueprint)

   if kafka_consumer_too:
      global socketio
      socketio = SocketIO(app=app, message_queue='kafka://localhost:9092', channel='some_topic')
      from .blueprints import kafka_consumption_blueprint
      app.register_blueprint(kafka_consumption_blueprint)

      return app, socketio

  return app

Мой run.py такой:

# ./run.py
from . import create_app
app, socketio = create_app(kafka_consumer_too=True)
if __name__=="__main__":
  socketio.run(app, debug=True)

А вот схема потребления Kafka, которую я написал, и вот где я думаю это должнообрабатывать события потока:

# ./blueprints/kafka_consumption_blueprint.py

from ..create_app import socketio

kafka_consumption_blueprint = Blueprint('kafka_consumption', __name__)

@socketio.on('message')
def handle_message(message):
    print('received message: ' + message)

Что он в данный момент делает

С учетом вышеизложенного мои HTTP-запросы обрабатываются нормально, когда я сверну localhost:5000. Проблема в том, что когда я пишу в тему some_topic Kafka (на порту 9092), ничего не появляется. У меня есть клиент CLI Kafka, работающий в другой оболочке, и я вижу, что сообщения, которые я отправляю по этой теме , отображаются. Так что это приложение Flask не реагирует: сообщения не принимаются handle_message().

Что мне здесь не хватает? Заранее спасибо.

1 Ответ

1 голос
/ 24 октября 2019

Я думаю, что вы неправильно интерпретируете значение аргумента message_queue.

Этот аргумент используется, когда у вас есть несколько экземпляров сервера. Эти экземпляры взаимодействуют друг с другом через настроенную очередь сообщений. Эта очередь на 100% внутренняя, вы, пользователь библиотеки, ничего не можете сделать с очередью сообщений.

Если вы хотите создать какой-то механизм публикации / подписки, то вам нужно реализоватьслушатель этого в вашем приложении.

...