У меня есть Flask веб-приложение, запущенное Gunicorn. Веб-приложение имеет несколько конечных точек, где оно получает JSON данные, обрабатывает их, а затем необходимо опубликовать sh обработанные данные в обмен RabbitMQ для других микросервисов, чтобы получить их для дальнейшей обработки. На стороне HTTP ответа не требуется.
У меня есть файл rabbit.py со следующим содержимым:
import pika
import json
import os
credentials = pika.PlainCredentials(os.environ['AMQP_USER'], os.environ['AMQP_USER'])
parameters = pika.ConnectionParameters(os.environ['AMQP_HOST'],
os.environ['AMQP_PORT'],
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange=os.environ['AMQP_EXCHANGE'],
exchange_type='fanout',
durable=True)
def publish(data):
channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
routing_key='',
body=json.dumps(data),
properties=pika.BasicProperties(delivery_mode = 2))
Из функции конечной точки Flask я затем выполню publish(data_object)
чтобы получить данные, опубликованные в RabbitMQ.
Проблема в том, что время ожидания соединения с RabbitMQ из-за не обработанных эхо-запросов
rabbitmq_1 | 2020-04-01 08:56:45.911 [info] <0.806.0> accepting AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672)
rabbitmq_1 | 2020-04-01 08:56:45.915 [info] <0.806.0> connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672): user 'guest' authenticated and granted access to vhost '/'
rabbitmq_1 | 2020-04-01 09:02:45.920 [error] <0.806.0> closing AMQP connection <0.806.0> (172.19.0.3:48856 -> 172.19.0.2:5672):
rabbitmq_1 | missed heartbeats from client, timeout: 60s
ingress-api_1 | [2020-04-01 09:06:16,835] ERROR in app: Exception on /ttn/v2 [POST]
ingress-api_1 | Traceback (most recent call last):
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 2446, in wsgi_app
ingress-api_1 | response = self.full_dispatch_request()
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1951, in full_dispatch_request
ingress-api_1 | rv = self.handle_user_exception(e)
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1820, in handle_user_exception
ingress-api_1 | reraise(exc_type, exc_value, tb)
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
ingress-api_1 | raise value
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1949, in full_dispatch_request
ingress-api_1 | rv = self.dispatch_request()
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/flask/app.py", line 1935, in dispatch_request
ingress-api_1 | return self.view_functions[rule.endpoint](**req.view_args)
ingress-api_1 | File "/app/main.py", line 28, in ttn_v2_endpoint
ingress-api_1 | ttn.parse_ttn_http_post_v2(request.get_json())
ingress-api_1 | File "/app/ttn.py", line 72, in parse_ttn_http_post_v2
ingress-api_1 | rabbit.publish(parsed_object)
ingress-api_1 | File "/app/rabbit.py", line 18, in publish
ingress-api_1 | channel.basic_publish(exchange=os.environ['AMQP_EXCHANGE'],
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2248, in basic_publish
ingress-api_1 | self._flush_output()
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1336, in _flush_output
ingress-api_1 | self._connection._flush_output(lambda: self.is_closed, *waiters)
ingress-api_1 | File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
ingress-api_1 | raise self._closed_result.value.error
ingress-api_1 | pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
Существуют некоторые сторонние расширения, которые приносят от кролика до flask. И затем есть пара предложений, как можно было бы реализовать это. Некоторые используют Celery.
Мне нужен простой и простой способ публикации данных в Rabbit. Мне не нужно подписываться на ответы. Могу ли я просто отключить пинг, чтобы он оставался в живых, или каким было бы лучшее и простое решение?