Я хотел бы использовать упомянутую здесь функцию amq.rabbitmq.reply-to: https://www.rabbitmq.com/direct-reply-to.html
Я могу заставить работать только функцию ответа путем явного объявления очереди reply_to в издателе.
Как я могу использовать функцию amq.rabbitmq.reply-to?
# Consumer side
import asyncio
import logging
import yaml
from pfrabbit.asynced.async_broker import Broker
from pfrabbit.asynced.async_consumer import ConsumerManager
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)
login = None
with open('./rabbit_login.yaml') as file:
login = yaml.load(file, Loader=yaml.FullLoader)
async def main():
broker = await Broker.get_rmc_broker(**login)
LOG.setLevel(logging.INFO)
LOG.info("in main()")
channel = await broker.channel
exchange = await channel.declare_exchange("direct", auto_delete=True)
queue = await channel.declare_queue("test", durable=True, auto_delete=True)
await queue.bind(exchange, "test")
async def handle(message):
print("in handle(message)")
print(message.body)
print(f"reply_to: {message.reply_to}")
message.body = bytes("successful reply", "utf-8")
await channel.declare_queue(message.reply_to, durable=True, auto_delete=True)
await exchange.publish(message, message.reply_to)
await channel.set_qos(prefetch_count=1)
await queue.consume(handle)
if __name__=='__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
try:
print("consumer_example running...")
loop.run_forever()
finally:
loop.shutdown_asyncgens()
Сторона publi sh - используя объявленный ответ -to queue:
import asyncio
import logging
import aio_pika
import yaml
from pfrabbit.asynced.async_broker import Broker
from pfrabbit.asynced.async_consume_publish import PFMessage
from pfrabbit.asynced.async_consumer import ConsumerManager
LOG = logging.getLogger(__name__)
login = None
with open('./rabbit_login.yaml') as file:
login = yaml.load(file, Loader=yaml.FullLoader)
async def main():
broker = await Broker.get_rmc_broker(**login)
channel = await broker.channel
exchange = await channel.declare_exchange("direct", auto_delete=True)
queue = await channel.declare_queue("test_reply", auto_delete=True, durable=True)
await queue.bind(exchange, "test_reply")
msg = PFMessage(bytes("my pf message", "utf-8"),
reply_to="test_reply",
correlation_id=77)
await exchange.publish(msg, "test")
arrived = False
while not arrived:
try:
out = await queue.get(timeout=4)
arrived = True
print(out)
print(out.body)
except aio_pika.exceptions.QueueEmpty:
asyncio.sleep(4)
if __name__=='__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Это aio_pika, но шаблон должен быть таким же.