Управление автоматическим удалением очередей RabbitMQ в разных потоках.питон - PullRequest
0 голосов
/ 18 мая 2018

Я хотел бы знать, является ли это правильным способом управления очередями auto_delete в разных потоках (в основном для проблем тестирования, когда я не хочу, чтобы очереди RabbitMQ оставались при закрытии соединения)

import pika
from threading import Thread

class ConsumerThread(Thread):

    def __init__(self, callback, queue):
        Thread.__init__(self)
        self.setDaemon(True)

        self.callback = callback
        self.queue = queue

    def run(self):
        # stablish connection
        connection = pika.BlockingConnection(pika.ConnectionParameters(CONNECTION['address'], CONNECTION['port'], CONNECTION['vhost'], CONNECTION['credentials']))
        channel = connection.channel()

        # create the auto-delete queue
        channel.queue_declare(queue=self.queue, auto_delete=True)

        # start consuming
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(self.callback, queue=self.queue)
        channel.start_consuming()

class Factory:

    def __init__(self):
        self.queue_init = "init.queue"
        self.queue_start = "start.queue"

        threads = [ConsumerThread(self.init_callback, self.queue_init), ConsumerThread(self.start_callback, self.queue_start)]
        for t in threads:
            t.start()

    def init_callback(self, ch, method, properties, body):
        # doing something

    def start_callback(self, ch, method, properties, body):
        # doing something

1 Ответ

0 голосов
/ 18 мая 2018

Команда RabbitMQ отслеживает список рассылки rabbitmq-users и только иногда отвечает на вопросы по StackOverflow.


Pika не является поточно-ориентированной.Вы должны быть уверены, что вызовы метода BlockingConnection происходят в том же потоке, в котором запущено соединение и канал.Исходя из вашего кода, я не уверен, что это произойдет, поскольку вы вызываете обратные вызовы в классе Factory, что кажется странным.Почему бы не использовать эти методы в ConsumerThread вместо этого?

Pika 0.12 и более поздних версий будет включать метод add_callback_threadsafe, который будет планировать выполнение метода в потоке ioloop.

...