RabbitMQ / Pika блокирует потребителей с помощью Evenlet - PullRequest
0 голосов
/ 04 июля 2018

Я работаю над большим приложением, которое требует eventlet, а теперь также требует rabbitMQ. Похоже, что eventlet заставляет потребительский поток pika блокировать выполнение дополнительных рабочих. Я знаю, что Пика не считается поточно-ориентированным, поэтому у меня есть все, включая соединение, в его собственном потоке. Я бы предположил, что блокирующее соединение должно блокировать только поток потребителя. Как заставить пику и эвентлет работать вместе? В приведенном ниже примере рабочий поток никогда ничего не печатает, но комментирование eventlet.monkey_patch() позволяет выполнять оба потока.

import threading
import pika

import eventlet
eventlet.monkey_patch()


def callback(ch, method, properties, body):
    print body
    ch.basic_ack(delivery_tag=method.delivery_tag)


def consumer():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test', durable=True,
                          exclusive=False, auto_delete=False)
    channel.basic_consume(callback, queue='test')
    channel.start_consuming()


def start_consumer_thread():
    # initialize a listener thread
    consumer_thread = threading.Thread(target=consumer)
    consumer_thread.start()

def worker():
    start_consumer_thread()
    for x in range(1,10000):
        print x


x = threading.Thread(target=worker())
x.start()

Ответы [ 2 ]

0 голосов
/ 21 августа 2018

Я смог заставить потребителя pika работать с eventlet путем внесения исправлений обезьянами как можно раньше, а также импортировать pika исправленным способом.

Сначала импортируйте и исправьте stdlib:

import eventlet
eventlet.monkey_patch()

Затем импортируйте и исправьте pika: 1010 *

pika = eventlet.import_patched('pika')

Я использовал эту стратегию импорта в сочетании с asynchronous_consumer_example: https://pika.readthedocs.io/en/stable/examples/asynchronous_consumer_example.html и использовал eventlet примитивы вместо threading для достижения неблокирующего потребителя.

0 голосов
/ 05 июля 2018

Пика и eventlet.monkey_patch не совместимы. Вам придется использовать eventlet без исправления системных вызовов, если это возможно.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы о StackOverflow.

...