Один потребитель читает поочередно из нескольких очередей - PullRequest
0 голосов
/ 06 июня 2019

Я новичок в rabbitMQ и пытаюсь создать приложение, в котором будет 3 роли: два производителя и один потребитель.Потребитель связан с двумя очередями, которые связаны с двумя производителями.Каждый производитель отправляет сообщение в очередь с разной частотой.Мне нужно, чтобы потребитель читал поочередно от двух производителей.

Например:

Производитель 1: Отправка "Hello" каждые 2 секунды. Производитель 2: Отправка "World" каждые 5 секунд. Потребитель: Печатать все, что получает

Таким образом, ожидается, что потребитель напечатает:

hello world hello world hello world ...

Поскольку производитель 1 отправляет сообщение чаще, чем производитель2, после того как потребитель прочитал от потребителя 1, ему нужно немного подождать прибытия сообщения от производителя 2 (это проблема)

Я попытался объявить две очереди для производителей и связать ихпотребителю, но потребитель печатает только что-то вроде:

привет привет привет привет мир

Спасибо за помощь!

Обновление: вот мой код

Производитель 1:

import pika
import sys

message = 'hello'


credentials = pika.PlainCredentials('xxxx', 'xxxx)
connection =pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')

while True:
    channel.basic_publish(exchange='', routing_key='hello', body=message)
    print('Sent message: {}'.format(message))
    connection.sleep(2)

connection.close()

Производитель 2:

import pika
import sys

message = 'world'


credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()


channel.queue_declare(queue='world')

while True:
    channel.basic_publish(exchange='', routing_key='world', body=message)
    print('Sent message: {}'.format(message))
    connection.sleep(4)

connection.close()

Потребитель 1:

import pika

def callback(ch, method, properties, body):
    print('Receive: {}'.format(body))


credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='world')

channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)
channel.basic_consume(on_message_callback=callback, queue='world', auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')


channel.start_consuming()

1 Ответ

0 голосов
/ 07 июня 2019

Поскольку потребитель может потреблять только из одной очереди, вам нужно будет убедиться, что все сообщения направляются в эту очередь.

Затем пользователь должен обработать сообщения.Пришлось бы использовать API опроса, чтобы получать отдельные сообщения.В зависимости от того, какой потребитель опубликовал каждое сообщение, потребитель должен будет действовать по-разному.Он может хранить локальное хранилище сообщений, поступающих от производителя 1, которые поступили до того, как было обработано сообщение, поступающее от производителя 2.Cosumer будет откладывать обработку сообщений, хранящихся в этом хранилище, до тех пор, пока не поступит сообщение от производителя 2.Только тогда он возьмет первое сообщение из этого магазина и начнет действовать.

Редактировать:

В коде, который вы добавили к своему вопросу, у вас естьодин канал (это хорошо), но два потребителя , по одному на каждый звонок channel.basic_consume.Оба потребителя используют один и тот же метод обратного вызова callback.Именно этот метод должен был бы реализовать логику, которую я описал выше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...