Я использую py-amqplib для доступа к RabbitMQ в Python. Приложение время от времени получает запросы на прослушивание определенных тем MQ.
При первом получении такого запроса он создает соединение AMQP и канал и запускает новый поток для прослушивания сообщений:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener очень просто:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
После создания соединения он подписывается на интересующую тему, например:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
В первый раз все это работает нормально. Однако при последующем запросе подписаться на другую тему не удается. При последующих запросах я повторно использую соединение AMQP и поток AMQPListener (поскольку я не хочу создавать новый поток для каждой темы), а также при вызове блока кода над методом channel.queue_declare () звонок никогда не возвращается. Я также попытался создать новый канал в этот момент, и вызов connection.channel () также никогда не возвращается.
Единственный способ заставить его работать - это создать новое соединение, канал и поток слушателя по теме (т. Е. Routing_key), но это на самом деле не идеально. Я подозреваю, что это метод wait (), который каким-то образом блокирует все соединение, но я не уверен, что с этим делать. Конечно, я должен иметь возможность получать сообщения с несколькими ключами маршрутизации (или даже по нескольким каналам), используя один поток слушателя?
Смежный вопрос: как мне остановить ветку слушателя, когда эта тема больше не представляет интереса? Вызов channel.wait () блокируется навсегда, если нет сообщений. Единственный способ, о котором я могу подумать, - это отправить в очередь фиктивное сообщение, которое его «отравит», т.е. быть интерпретированным слушателем как сигнал к остановке.