Перед публикацией сообщения код проверяет, что соединение RabbitMQ не закрыто перед фактической публикацией (см. Код вызова ниже).Тем не менее, кажется, что при проверке возникает странная ошибка TypeError.
Дополнительные сведения:
- Асинхронный потребитель (т. Е. Использует соединение select
pika.SelectConnection
) прослушивает определенныйqueue. - Когда потребитель обрабатывает сообщение, он публикует другие сообщения в другой очереди через не асинхронного производителя (то есть использует блокирующее соединение
pika.BlockingConnection
). - Потребитель и производительотдельные части кода, но лежат в одной и той же кодовой базе и выполняются в одном и том же процессе (и потоке) на одном и том же сервере.Они были разделены для лучшей развязки кода.
- Причина, по которой только потребитель является асинхронным, заключается в том, что вызов
rmq_consumer_connection.ioloop.start()
является блокирующей операцией (следовательно, не может сделать эквивалент rmq_producer_connection.ioloop.start()
).Я могу рассмотреть возможность оптимизации (например, использовать отдельные потоки для потребителя и производителя, чтобы оба они могли быть асинхронными) в будущем. - Использование pika 0.12.0, RabbitMQ 3.7.8, Python 3.5
Stacktrace следующим образом:
Apr 05 19:25:16 prod python[23326]: job|callback_user_queue|ERROR| Exception in callback_user_queue(..)
Apr 05 19:25:16 prod python[23326]: Traceback (most recent call last):
Apr 05 19:25:16 prod python[23326]: File "/opt/app/gateway_messenger.py", line 282, in send_to_gateway
Apr 05 19:25:16 prod python[23326]: gv.rmq_producer_connection.process_data_events()
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 751, in process_data_events
Apr 05 19:25:16 prod python[23326]: with _IoloopTimerContext(time_limit, self._impl) as timer:
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 188, in __enter__
Apr 05 19:25:16 prod python[23326]: self._callback_result.signal_once)
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/base_connection.py", line 112, in add_timeout
Apr 05 19:25:16 prod python[23326]: return self.ioloop.add_timeout(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 364, in add_timeout
Apr 05 19:25:16 prod python[23326]: return self._timer.call_later(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]: File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 199, in call_later
Apr 05 19:25:16 prod python[23326]: heapq.heappush(self._timeout_heap, timeout)
Apr 05 19:25:16 prod python[23326]: TypeError: heap argument must be a list
Телефонный код следующим образом:
def send_to_gateway(uid, data):
...(non-relevant code)...
try:
gv.rmq_producer_connection.process_data_events() <-- fails here
except pika.exceptions.ConnectionClosed:
logger.warn("RMQ producer connection is closed. Re-opening...")
initialise_producer()
...(non-relevant code)...
def initialise_producer():
logger.info("Initialising RabbitMQ producer...")
gv.rmq_producer_connection = pika.BlockingConnection(
pika.ConnectionParameters(gv.config['rmq_ip']))
gv.rmq_producer_channel = gv.rmq_producer_connection.channel()
gv.rmq_producer_channel.queue_declare(queue=gv.config['rmq_queue'],
durable=True)
logger.info("Initialising RabbitMQ producer done!")
Ожидаемый результат:
- Во время проверки, если соединение RabbitMQ закрыто, производитель будет повторно инициализирован перед публикацией сообщения.
- Если соединение не закрыто, перейдите к публикации сообщения в обычном режиме.