Можно ли использовать функцию прямого ответа RabbitMQ с потребителем генератора Pika в Python? - PullRequest
2 голосов
/ 01 июля 2019

Я хотел бы использовать функцию прямой ответ на RabbitMQ с клиентской библиотекой Pika в Python.Он работает с базовым потребителем.Но возникает следующее исключение с потребителем генератора :

pika.exceptions.ChannelClosedByBroker: (406, 'PRECONDITION_FAILED - потребитель быстрого ответа не существует')

Есть ли способ использовать функцию прямого ответа с потребителем генератора?

Пример кода клиента с использованием базового потребителя (работает):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.start_consuming()

Пример клиентского кода с использованием генератора генератора (возникает исключение):

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)

    for (method, properties, body) in channel.consume(
            queue="amq.rabbitmq.reply-to", auto_ack=True):
        handle(channel, method, properties, body)

Environment. - Windows 10, RabbitMQ 3.7.13, CPython 3.7.3,Pika 1.0.1.

Примечание. - вызов метода basic_consume после метода basic_publish в примере клиентского кода с использованием базового потребителя вызывает то же самоеисключение как при использовании генератора-потребителя:

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)
    channel.basic_consume(queue="amq.rabbitmq.reply-to",
                          on_message_callback=handle, auto_ack=True)
    channel.start_consuming()

1 Ответ

0 голосов
/ 03 июля 2019

Как подсказывает Люк Баккен здесь , это делает трюк:

import pika


def handle(channel, method, properties, body):
    message = body.decode()
    print("received:", message)


connection = pika.BlockingConnection()
channel = connection.channel()

with connection, channel:
    message = "hello"
    next(channel.consume(queue="amq.rabbitmq.reply-to", auto_ack=True,
                         inactivity_timeout=0.1))
    channel.basic_publish(
        exchange="", routing_key="test", body=message.encode(),
        properties=pika.BasicProperties(reply_to="amq.rabbitmq.reply-to"))
    print("sent:", message)

    for (method, properties, body) in channel.consume(
            queue="amq.rabbitmq.reply-to", auto_ack=True):
        handle(channel, method, properties, body)
...