как использовать многопоточность с pika и rabbitmq для выполнения запросов и ответов RPC - PullRequest
0 голосов
/ 27 мая 2019

Я работаю над проектом с Rabbitmq, я использую шаблон RPC, в основном я получаю или потребляю сообщения из очереди, выполняю некоторую обработку и затем отправляю ответ обратно. Я использую Pika, моя цель - использовать поток для каждой задачи, поэтому для каждой задачи я буду создавать поток для этой задачи перкулярно. Я также читал, что лучшая практика - это создавать только одно Соединение и под ним много каналов, как я хочу, но я всегда получаю эту ошибку: 'start_consuming не может быть вызван из области' pika.exceptions.RecursionError: start_consuming не может быть вызван из области другого обратного вызова BlockingConnection или BlockingChannel.

Я провел некоторое исследование и обнаружил, что Pika не является потокобезопасным, и мы должны использовать для каждого потока независимое соединение и канал. но я не хочу этого делать, поскольку это считается плохой практикой. Поэтому я хотел бы спросить здесь, если кто-то уже достиг, чтобы сделать эту работу. Я также прочитал, что это возможно, если я не использовал BlockingConnection для создания экземпляра моего соединения, а также что есть функция с именем add_callback_threadsafe, которая может сделать это возможным. но, к сожалению, нет примеров для этого, и я читаю документацию, но она сложная, и без примеров мне было трудно понять, что они хотят описать.

Моя попытка состояла в том, чтобы объявить два класса. Каждый класс будет представлять Исполнителя Задачи, который получает или потребляет сообщение из очереди и на основании этого произвел некоторую Обработку и доставил Ответ обратно. Моя идея состояла в том, чтобы разделить соединение rabbitmq между двумя Задачами, но каждая Задача получит независимый Канал. в приведенном выше коде параметр кролика, передаваемый в функцию, является классом, который содержит некоторые переменные, такие как Connection и другие функции, такие как EventSubscriber, который при вызове назначает новый канал и начинает потреблять сообщения от этого конкретного обмена и routingKey. Затем я объявляю поток и назначаю функцию подписки или использования в качестве цели для этого потока. другой класс задач выглядит так же, как этот класс, поэтому я буду загружать только этот код. в главном классе я устанавливаю соединение с rabbitmq и передаю его в качестве параметра конструктору двух классов задач.

класс On_Deregistration:

def __init__(self, rabbit):
   self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq

def event(self, rabbit):

    self.Subscriber = rabbit.EventSubscriber(rabbit,  'testing.test',  'test', False,  onDeregistrationFromHRS # this func is task listener)

def subscribeAsync(self):
    self.Subscriber.subscribe() # here i call start_consuming

def start(self):
    """start Subscribtion in an Independant Thread  """
    thread = threading.Thread(target = self.subscribeAsync )  
    thread.start()
    if thread.isAlive():
        print("asynchronous subscription started")

Основной класс:

Приложение класса:

def __init__(self):

    self.rabbitMq = RabbitMqCommunicationInterface(host='evallx033.emea.porsche.biz', port=5672)
    firstTask =  On_Deregistration(self.rabbitMq)
    secondTask =  secondTask(self.rabbitMq)

app = App ()

ошибка: 'start_consuming не может быть вызван из области видимости'

pika.exceptions.RecursionError: start_consuming нельзя вызывать из области действия другого обратного вызова BlockingConnection или BlockingChannel

Я искал причину этой ошибки и, очевидно, что pika не является поточно-ориентированной, но для этого должно быть решение. Может быть, не используя BlockingConnection? Может быть, кто-то может дать мне пример, как это сделать, потому что я попробовал и не работал. Может быть, я что-то упустил, как реализовать многопоточность с rabbitmq

1 Ответ

0 голосов
/ 17 июля 2019

так что после долгих исследований я выяснил, что Пика небезопасна. ну, на данный момент, по крайней мере, может быть, в новых версиях это будет потокобезопасным. так что теперь для моего проекта я перестал использовать Pika, и я использую Rabbitpy, который является поточно-ориентированной библиотекой. но я должен сказать, что Pika - отличная библиотека, и я считаю, что API лучше описан и структурирован, чем rabbitpy, но для моего проекта было обязательным использование многопоточности, и поэтому Pika на данный момент был плохим выбором. Я надеюсь, что это поможет кому-то в будущем

...