Как использовать amq.rabbitmq.reply-to в python? - PullRequest
0 голосов
/ 20 июня 2020

Я хотел бы использовать упомянутую здесь функцию amq.rabbitmq.reply-to: https://www.rabbitmq.com/direct-reply-to.html

Я могу заставить работать только функцию ответа путем явного объявления очереди reply_to в издателе.

Как я могу использовать функцию amq.rabbitmq.reply-to?

# Consumer side

import asyncio

import logging

import yaml

from pfrabbit.asynced.async_broker import Broker
from pfrabbit.asynced.async_consumer import ConsumerManager

LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)

login = None
with open('./rabbit_login.yaml') as file:
    login = yaml.load(file, Loader=yaml.FullLoader)



async def main():
    broker = await Broker.get_rmc_broker(**login)
    LOG.setLevel(logging.INFO)
    LOG.info("in main()")
    channel = await broker.channel
    exchange = await channel.declare_exchange("direct", auto_delete=True)
    queue = await channel.declare_queue("test", durable=True, auto_delete=True)

    await queue.bind(exchange, "test")

    async def handle(message):
        print("in handle(message)")
        print(message.body)
        print(f"reply_to: {message.reply_to}")
        message.body = bytes("successful reply", "utf-8")
        await channel.declare_queue(message.reply_to, durable=True, auto_delete=True)

        await exchange.publish(message, message.reply_to)

    await channel.set_qos(prefetch_count=1)
    await queue.consume(handle)




if __name__=='__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    try:
        print("consumer_example running...")
        loop.run_forever()
    finally:
        loop.shutdown_asyncgens()

Сторона publi sh - используя объявленный ответ -to queue:

import asyncio

import logging

import aio_pika
import yaml

from pfrabbit.asynced.async_broker import Broker
from pfrabbit.asynced.async_consume_publish import PFMessage
from pfrabbit.asynced.async_consumer import ConsumerManager

LOG = logging.getLogger(__name__)

login = None
with open('./rabbit_login.yaml') as file:
    login = yaml.load(file, Loader=yaml.FullLoader)



async def main():
    broker = await Broker.get_rmc_broker(**login)
    channel = await broker.channel
    exchange = await channel.declare_exchange("direct", auto_delete=True)
    queue = await channel.declare_queue("test_reply", auto_delete=True, durable=True)
    await queue.bind(exchange, "test_reply")
    msg = PFMessage(bytes("my pf message", "utf-8"),
                    reply_to="test_reply",
                    correlation_id=77)
    await exchange.publish(msg, "test")
    arrived = False
    while not arrived:
        try:
            out = await queue.get(timeout=4)
            arrived = True
            print(out)
            print(out.body)
        except aio_pika.exceptions.QueueEmpty:
            asyncio.sleep(4)



if __name__=='__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Это aio_pika, но шаблон должен быть таким же.

...