Подписка на нескольких производителей в RabbitMQ - PullRequest
0 голосов
/ 24 сентября 2018

У меня есть одно сомнение.прежде всего, дайте мне знать, я думаю, что правильно или нет.здесь идет моя постановка проблемы.У меня есть два продюсера.seller_1 и seller_2 и один потребитель

provider_1, продюсер_2 выполнит некоторую обработку (они независимы и имеют один общий атрибут) и опубликует свои собственные отчеты.потребитель должен начать свой процесс только тогда, когда продюсер_1 и продюсер_2 отправят свои отчеты в очередь.

что я сделал до сих пор:

import pika, json

# will load XX, XXX from some config file
# callback_function will accept message as parameter

class Event(): 
    def __init__(name):
        self.connect()
        self.name = name
        self.callback_function = None

    def connect():
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(....))
        self.channel = self.connection.channel()
        seld.db_store = ....


    def subscribe(callback):
        self.callback_function = callback
        self.channel.exchange_declare(exchange=XX, exchange_type="fanout", durable=True)
        r = self.channel.queue_declare(exclusive=True)
        queue_name = r.method.queue
        self.channel.queue_bind(exchange=XX, queue=queue_name)
        self.channel.basic_consume(self.callback, queue=queue_name, no_ack=False)
        self.channel.start_consuming()

    def callback(self, ch, method, properties, body):
        message = json.loads(body)
        self.db_store[self.name].append(message[shared_attrib])
        self.callback_function(message)
        ch.basic_ack(delivery_tag=method.delivery_tag)


producer_1 = Event('PRODUCER_1')
producer_1 = Event('PRODUCER_2')

def somefunction(message):
    db_store = ...
    if shared_attrib in db_store[PRODUCER_1]:
        if shared_attrib in db_store[PRODUCER_2]:
            # do some process

producer_1.subscribe(somefunction)
producer_2.subscribe(somefunction)  

в чем моя проблема здесь.

  • method_2.subscribe метод вызываться не будет.из-за метода basic_consume, который находится в provider_1.subscribe

  • Я могу применить многопоточность для callback_function внутри метода подписки, но мы не должны использовать потоки (точно не уверен. но я помню, я изучалэто где-то)

...