«TypeError: аргумент кучи должен быть списком», когда process_data_events () вызывается для соединения блокирования pika RabbitMQ - PullRequest
2 голосов
/ 08 апреля 2019

Перед публикацией сообщения код проверяет, что соединение RabbitMQ не закрыто перед фактической публикацией (см. Код вызова ниже).Тем не менее, кажется, что при проверке возникает странная ошибка TypeError.

Дополнительные сведения:

  • Асинхронный потребитель (т. Е. Использует соединение select pika.SelectConnection) прослушивает определенныйqueue.
  • Когда потребитель обрабатывает сообщение, он публикует другие сообщения в другой очереди через не асинхронного производителя (то есть использует блокирующее соединение pika.BlockingConnection).
  • Потребитель и производительотдельные части кода, но лежат в одной и той же кодовой базе и выполняются в одном и том же процессе (и потоке) на одном и том же сервере.Они были разделены для лучшей развязки кода.
  • Причина, по которой только потребитель является асинхронным, заключается в том, что вызов rmq_consumer_connection.ioloop.start() является блокирующей операцией (следовательно, не может сделать эквивалент rmq_producer_connection.ioloop.start()).Я могу рассмотреть возможность оптимизации (например, использовать отдельные потоки для потребителя и производителя, чтобы оба они могли быть асинхронными) в будущем.
  • Использование pika 0.12.0, RabbitMQ 3.7.8, Python 3.5

Stacktrace следующим образом:

Apr 05 19:25:16 prod python[23326]: job|callback_user_queue|ERROR| Exception in callback_user_queue(..)
Apr 05 19:25:16 prod python[23326]: Traceback (most recent call last):
Apr 05 19:25:16 prod python[23326]:   File "/opt/app/gateway_messenger.py", line 282, in send_to_gateway
Apr 05 19:25:16 prod python[23326]:     gv.rmq_producer_connection.process_data_events()
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 751, in process_data_events
Apr 05 19:25:16 prod python[23326]:     with _IoloopTimerContext(time_limit, self._impl) as timer:
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/blocking_connection.py", line 188, in __enter__
Apr 05 19:25:16 prod python[23326]:     self._callback_result.signal_once)
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/base_connection.py", line 112, in add_timeout
Apr 05 19:25:16 prod python[23326]:     return self.ioloop.add_timeout(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 364, in add_timeout
Apr 05 19:25:16 prod python[23326]:     return self._timer.call_later(deadline, callback_method)
Apr 05 19:25:16 prod python[23326]:   File "/home/prod/anaconda3/envs/venv/lib/python3.5/site-packages/pika/adapters/select_connection.py", line 199, in call_later
Apr 05 19:25:16 prod python[23326]:     heapq.heappush(self._timeout_heap, timeout)
Apr 05 19:25:16 prod python[23326]: TypeError: heap argument must be a list

Телефонный код следующим образом:

def send_to_gateway(uid, data):
...(non-relevant code)...
    try:
        gv.rmq_producer_connection.process_data_events() <-- fails here
    except pika.exceptions.ConnectionClosed:
        logger.warn("RMQ producer connection is closed. Re-opening...")
        initialise_producer()
...(non-relevant code)...


def initialise_producer():
    logger.info("Initialising RabbitMQ producer...")

    gv.rmq_producer_connection = pika.BlockingConnection(
        pika.ConnectionParameters(gv.config['rmq_ip']))
    gv.rmq_producer_channel = gv.rmq_producer_connection.channel()
    gv.rmq_producer_channel.queue_declare(queue=gv.config['rmq_queue'],
                                          durable=True)
    logger.info("Initialising RabbitMQ producer done!")

Ожидаемый результат:

  • Во время проверки, если соединение RabbitMQ закрыто, производитель будет повторно инициализирован перед публикацией сообщения.
  • Если соединение не закрыто, перейдите к публикации сообщения в обычном режиме.
...