Pika RabbitMQ Publi sh от потребителя - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть потребитель RabbitMQ. Я бы хотел, чтобы этот потребитель выполнил некоторую обработку сообщений, смоделированную time.sleep(10), а затем опубликовал sh сообщение в другой очереди. Я знаю, что обратный вызов потребителя имеет канал, который теоретически может быть использован для публикации sh, но это кажется плохой реализацией, потому что если basic_publish() каким-то образом удастся принудительно закрыть канал, то потребитель умрет. Каков наилучший способ справиться с этим?

import time
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='original_queue', exclusive=True)

channel.queue_bind(exchange='logs', queue='original_queue')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    time.sleep(10)
    ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')

channel.basic_consume(
    queue='original_queue', on_message_callback=callback, auto_ack=True)

channel.start_consuming()

1 Ответ

1 голос
/ 29 апреля 2020

Вы можете реализовать своего потребителя таким образом, чтобы он автоматически переподключался к серверу RabbitMQ, если соединение закрывается. Надеюсь, это поможет (я не особо задумывался над дизайном, не стесняйтесь предлагать некоторые из них!)

import time
import pika

reconnect_on_failure = True


def consumer(connection, channel):

    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    result = channel.queue_declare(queue='original_queue', exclusive=True)
    channel.queue_bind(exchange='logs', queue='original_queue')
    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
        time.sleep(10)
        ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')

    channel.basic_consume(
  queue='original_queue', on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


def get_connection_and_channel():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()


def start(reconnect_on_failure):
    connection, channel = get_connection_and_channel()
    consumer(connection, channel)
    # the if condition will be executed when the consumer's start_consuming loop exists
    if reconnect_on_failure:
        # cleanly close the connection and channel
        if not connection.is_closed():
            connection.close()
        if not channel.is_close():
            channel.close()
        start(reconnect_on_failure)


start(reconnect_on_failure)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...