Publi sh в RabbitMQ от Flask - PullRequest
       7

Publi sh в RabbitMQ от Flask

1 голос
/ 01 апреля 2020

У меня есть Flask веб-приложение, запущенное Gunicorn. Веб-приложение имеет несколько конечных точек, где оно получает JSON данные, обрабатывает их, а затем необходимо опубликовать sh обработанные данные в обмен RabbitMQ для других микросервисов, чтобы получить их для дальнейшей обработки. На стороне HTTP ответа не требуется.

У меня есть файл rabbit.py со следующим содержимым:

import pika
import json
import os

credentials = pika.PlainCredentials(os.environ['AMQP_USER'], os.environ['AMQP_USER'])
parameters = pika.ConnectionParameters(os.environ['AMQP_HOST'],
                                       os.environ['AMQP_PORT'],
                                       '/',
                                       credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.exchange_declare(exchange=os.environ['AMQP_EXCHANGE'],
                         exchange_type='fanout',
                         durable=True)

def publish(data):
    channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
                        routing_key='',
                        body=json.dumps(data),
                        properties=pika.BasicProperties(delivery_mode = 2))

Из функции конечной точки Flask я затем выполню publish(data_object) чтобы получить данные, опубликованные в RabbitMQ.

Проблема в том, что время ожидания соединения с RabbitMQ из-за не обработанных эхо-запросов

rabbitmq_1     | 2020-04-01 08:56:45.911 [info] <0.806.0> accepting AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672)
rabbitmq_1     | 2020-04-01 08:56:45.915 [info] <0.806.0> connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
rabbitmq_1     | 2020-04-01 09:02:45.920 [error] <0.806.0> closing AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672):
rabbitmq_1     | missed heartbeats from client, timeout: 60s
ingress-api_1  | [2020-04-01 09:06:16,835] ERROR in app: Exception on /ttn/v2 [POST]
ingress-api_1  | Traceback (most recent call last):
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2446, in wsgi_app
ingress-api_1  |     response = self.full_dispatch_request()
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1951, in full_dispatch_request
ingress-api_1  |     rv = self.handle_user_exception(e)
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1820, in handle_user_exception
ingress-api_1  |     reraise(exc_type, exc_value, tb)
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
ingress-api_1  |     raise value
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1949, in full_dispatch_request
ingress-api_1  |     rv = self.dispatch_request()
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1935, in dispatch_request
ingress-api_1  |     return self.view_functions[rule.endpoint](**req.view_args)
ingress-api_1  |   File "/app/main.py", line 28, in ttn_v2_endpoint
ingress-api_1  |     ttn.parse_ttn_http_post_v2(request.get_json())
ingress-api_1  |   File "/app/ttn.py", line 72, in parse_ttn_http_post_v2
ingress-api_1  |     rabbit.publish(parsed_object)
ingress-api_1  |   File "/app/rabbit.py", line 18, in publish
ingress-api_1  |     channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
ingress-api_1  |     self._flush_output()
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
ingress-api_1  |     self._connection._flush_output(lambda: self.is_closed, *waiters)
ingress-api_1  |   File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
ingress-api_1  |     raise self._closed_result.value.error
ingress-api_1  | pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')

Существуют некоторые сторонние расширения, которые приносят от кролика до flask. И затем есть пара предложений, как можно было бы реализовать это. Некоторые используют Celery.

Мне нужен простой и простой способ публикации данных в Rabbit. Мне не нужно подписываться на ответы. Могу ли я просто отключить пинг, чтобы он оставался в живых, или каким было бы лучшее и простое решение?

1 Ответ

0 голосов
/ 12 апреля 2020

Я получил это для работы с использованием Flask расширения: https://github.com/wdtinc/flask-pika

У него нет такой же ошибки подключения, поэтому, похоже, он работает как ожидалось.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...