Как ждать сообщения в нескольких очередях, используя py-amqplib - PullRequest
3 голосов
/ 27 ноября 2009

Я использую py-amqplib для доступа к RabbitMQ в Python. Приложение время от времени получает запросы на прослушивание определенных тем MQ.

При первом получении такого запроса он создает соединение AMQP и канал и запускает новый поток для прослушивания сообщений:

    connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
    channel = connection.channel()

    listener = AMQPListener(channel)
    listener.start()

AMQPListener очень просто:

class AMQPListener(threading.Thread):
    def __init__(self, channel):
        threading.Thread.__init__(self)
        self.__channel = channel

    def run(self):
        while True:
            self.__channel.wait()

После создания соединения он подписывается на интересующую тему, например:

channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)

def receive_callback(msg):
    self.queue.put(msg.body)

channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)

В первый раз все это работает нормально. Однако при последующем запросе подписаться на другую тему не удается. При последующих запросах я повторно использую соединение AMQP и поток AMQPListener (поскольку я не хочу создавать новый поток для каждой темы), а также при вызове блока кода над методом channel.queue_declare () звонок никогда не возвращается. Я также попытался создать новый канал в этот момент, и вызов connection.channel () также никогда не возвращается.

Единственный способ заставить его работать - это создать новое соединение, канал и поток слушателя по теме (т. Е. Routing_key), но это на самом деле не идеально. Я подозреваю, что это метод wait (), который каким-то образом блокирует все соединение, но я не уверен, что с этим делать. Конечно, я должен иметь возможность получать сообщения с несколькими ключами маршрутизации (или даже по нескольким каналам), используя один поток слушателя?

Смежный вопрос: как мне остановить ветку слушателя, когда эта тема больше не представляет интереса? Вызов channel.wait () блокируется навсегда, если нет сообщений. Единственный способ, о котором я могу подумать, - это отправить в очередь фиктивное сообщение, которое его «отравит», т.е. быть интерпретированным слушателем как сигнал к остановке.

1 Ответ

1 голос
/ 07 января 2010

Если вам нужно более одного потребителя на канал, просто подключите другой, используя basic_consume () и используйте channel.wait () после. Он будет прослушивать все очереди, подключенные через basic_consume () . Убедитесь, что вы определяете различные потребительские теги для каждого basic_consume () .

Используйте channel.basic_cancel (consumer_tag) , если вы хотите отменить определенного потребителя в очереди (отмена прослушивания определенной темы).

...