Нужен синхронный производитель Кафки в приложении для колб с использованием Confluent Kafka - PullRequest
0 голосов
/ 04 октября 2018

Я пытаюсь запустить приложение Flask, которое использует Confluent Kafka для отправки сообщений на определенную тему.Когда я запускаю его как локальное приложение-колбу, оно работает нормально, но когда я запускаю его в контейнере Docker за nginx, оно не работает.Метод send работает, но когда я пытаюсь сделать его синхронным, вызывая метод flush, он "зависает".Глядя на вывод, кажется, что сброс порождает другой процесс.Я предполагаю, что причина, по которой он зависает, заключается в том, что флеш работает против другого экземпляра производителя.Я могу решить проблему, если я создаю экземпляр производителя внутри почтового блока.Однако для его создания требуется почти 100 мс, и это неприемлемо, поэтому возникает желание глобального производителя.Ниже приведен пример приложения для работы с колбой, в котором демонстрируется проблема.

from flask import Flask, request, jsonify
from flask_restful import Resource, Api, abort

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers':"kafka.nonprod:9092"})

# Create the app and api
app = Flask(__name__)
api = Api(app)

class Gateway(Resource):
    def post(self):
        try:
            message = "This is a test message"

            producer.produce("output_topic", message)
            producer.flush()

            resp = jsonify({"response": "Okay"})
            resp.status_code = 200
            return resp

        except Exception as exception:
            print(type(exception))
            print(exception.message)
            abort(400, data=request.data, message=exception.message)

api.add_resource(Gateway, '/')

Я также включаю вывод из образа Docker здесь:

    docker run --rm -p 80:80 gateway
/usr/lib/python2.7/site-packages/supervisor/options.py:298: UserWarning: Supervisord is running as root and it is searching for its configuration file in default locations (including its current working directory); you probably want to specify a "-c" argument specifying an absolute path to a configuration file for improved security.
  'Supervisord is running as root and it is searching '
2018-10-04 16:52:07,212 CRIT Supervisor running as root (no user in config file)
2018-10-04 16:52:07,213 INFO supervisord started with pid 1
2018-10-04 16:52:08,216 INFO spawned: 'nginx' with pid 10
2018-10-04 16:52:08,219 INFO spawned: 'uwsgi' with pid 11
[uWSGI] getting INI configuration from /etc/uwsgi/uwsgi.ini
*** Starting uWSGI 2.0.17 (64bit) on [Thu Oct  4 16:52:08 2018] ***
compiled with version: 6.4.0 on 27 March 2018 12:43:27
os: Linux-4.9.93-linuxkit-aufs #1 SMP Wed Jun 6 16:55:56 UTC 2018
nodename: 43e8ded405ee
machine: x86_64
clock source: unix
pcre jit disabled
detected number of CPU cores: 4
current working directory: /gateway
detected binary path: /usr/sbin/uwsgi
your memory page size is 4096 bytes
detected max file descriptor number: 1048576
lock engine: pthread robust mutexes
thunder lock: disabled (you can enable it with --thunder-lock)
uwsgi socket 0 bound to UNIX address /tmp/uwsgi.sock fd 3
setgid() to 101
set additional group 82 (www-data)
setuid() to 100
Python version: 2.7.15 (default, Aug 22 2018, 13:24:18)  [GCC 6.4.0]
Python main interpreter initialized at 0x7f7cce1de760
python threads support enabled
your server socket listen backlog is limited to 100 connections
your mercy for graceful operations on workers is 60 seconds
mapped 437520 bytes (427 KB) for 5 cores
*** Operational MODE: preforking ***
WSGI app 0 (mountpoint='') ready in 0 seconds on interpreter 0x7f7cce1de760 pid: 11 (default app)
*** uWSGI is running in multiple interpreter mode ***
spawned uWSGI master process (pid: 11)
spawned uWSGI worker 1 (pid: 19, cores: 1)
2018-10-04 16:52:09,322 INFO success: nginx entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2018-10-04 16:52:09,322 INFO success: uwsgi entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
spawned uWSGI worker 2 (pid: 26, cores: 1)

У меня была предыдущая версия моего приложенияэто работало с использованием библиотеки Apache Kafka.Я перехожу на библиотеки Confluent Kafka, чтобы использовать преимущества реестра схемы и всех предоставляемых им инструментов управления схемами.

...