Я работаю над проектом с Rabbitmq, я использую шаблон RPC, в основном я получаю или потребляю сообщения из очереди, выполняю некоторую обработку и затем отправляю ответ обратно. Я использую Pika, моя цель - использовать поток для каждой задачи, поэтому для каждой задачи я буду создавать поток для этой задачи перкулярно. Я также читал, что лучшая практика - это создавать только одно Соединение и под ним много каналов, как я хочу, но я всегда получаю эту ошибку:
'start_consuming не может быть вызван из области'
pika.exceptions.RecursionError: start_consuming не может быть вызван из области другого обратного вызова BlockingConnection или BlockingChannel.
Я провел некоторое исследование и обнаружил, что Pika не является потокобезопасным, и мы должны использовать для каждого потока независимое соединение и канал. но я не хочу этого делать, поскольку это считается плохой практикой. Поэтому я хотел бы спросить здесь, если кто-то уже достиг, чтобы сделать эту работу. Я также прочитал, что это возможно, если я не использовал BlockingConnection для создания экземпляра моего соединения, а также что есть функция с именем add_callback_threadsafe, которая может сделать это возможным. но, к сожалению, нет примеров для этого, и я читаю документацию, но она сложная, и без примеров мне было трудно понять, что они хотят описать.
Моя попытка состояла в том, чтобы объявить два класса. Каждый класс будет представлять Исполнителя Задачи, который получает или потребляет сообщение из очереди и на основании этого произвел некоторую Обработку и доставил Ответ обратно. Моя идея состояла в том, чтобы разделить соединение rabbitmq между двумя Задачами, но каждая Задача получит независимый Канал. в приведенном выше коде параметр кролика, передаваемый в функцию, является классом, который содержит некоторые переменные, такие как Connection и другие функции, такие как EventSubscriber, который при вызове назначает новый канал и начинает потреблять сообщения от этого конкретного обмена и routingKey. Затем я объявляю поток и назначаю функцию подписки или использования в качестве цели для этого потока. другой класс задач выглядит так же, как этот класс, поэтому я буду загружать только этот код. в главном классе я устанавливаю соединение с rabbitmq и передаю его в качестве параметра конструктору двух классов задач.
класс On_Deregistration:
def __init__(self, rabbit):
self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq
def event(self, rabbit):
self.Subscriber = rabbit.EventSubscriber(rabbit, 'testing.test', 'test', False, onDeregistrationFromHRS # this func is task listener)
def subscribeAsync(self):
self.Subscriber.subscribe() # here i call start_consuming
def start(self):
"""start Subscribtion in an Independant Thread """
thread = threading.Thread(target = self.subscribeAsync )
thread.start()
if thread.isAlive():
print("asynchronous subscription started")
Основной класс:
Приложение класса:
def __init__(self):
self.rabbitMq = RabbitMqCommunicationInterface(host='evallx033.emea.porsche.biz', port=5672)
firstTask = On_Deregistration(self.rabbitMq)
secondTask = secondTask(self.rabbitMq)
app = App ()
ошибка: 'start_consuming не может быть вызван из области видимости'
pika.exceptions.RecursionError: start_consuming нельзя вызывать из области действия другого обратного вызова BlockingConnection или BlockingChannel
Я искал причину этой ошибки и, очевидно, что pika не является поточно-ориентированной, но для этого должно быть решение. Может быть, не используя BlockingConnection? Может быть, кто-то может дать мне пример, как это сделать, потому что я попробовал и не работал. Может быть, я что-то упустил, как реализовать многопоточность с rabbitmq