Я новичок в rabbitMQ и пытаюсь создать приложение, в котором будет 3 роли: два производителя и один потребитель.Потребитель связан с двумя очередями, которые связаны с двумя производителями.Каждый производитель отправляет сообщение в очередь с разной частотой.Мне нужно, чтобы потребитель читал поочередно от двух производителей.
Например:
Производитель 1: Отправка "Hello" каждые 2 секунды. Производитель 2: Отправка "World" каждые 5 секунд. Потребитель: Печатать все, что получает
Таким образом, ожидается, что потребитель напечатает:
hello world hello world hello world ...
Поскольку производитель 1 отправляет сообщение чаще, чем производитель2, после того как потребитель прочитал от потребителя 1, ему нужно немного подождать прибытия сообщения от производителя 2 (это проблема)
Я попытался объявить две очереди для производителей и связать ихпотребителю, но потребитель печатает только что-то вроде:
привет привет привет привет мир
Спасибо за помощь!
Обновление: вот мой код
Производитель 1:
import pika
import sys
message = 'hello'
credentials = pika.PlainCredentials('xxxx', 'xxxx)
connection =pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
while True:
channel.basic_publish(exchange='', routing_key='hello', body=message)
print('Sent message: {}'.format(message))
connection.sleep(2)
connection.close()
Производитель 2:
import pika
import sys
message = 'world'
credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='world')
while True:
channel.basic_publish(exchange='', routing_key='world', body=message)
print('Sent message: {}'.format(message))
connection.sleep(4)
connection.close()
Потребитель 1:
import pika
def callback(ch, method, properties, body):
print('Receive: {}'.format(body))
credentials = pika.PlainCredentials('xxxx', 'xxxx')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='hello')
channel.queue_declare(queue='world')
channel.basic_consume(on_message_callback=callback, queue='hello', auto_ack=True)
channel.basic_consume(on_message_callback=callback, queue='world', auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()