Совместное использование канала RabbitMQ между несколькими процессами Python - PullRequest
0 голосов
/ 17 января 2019

Я хочу поделиться BlockingChannel между несколькими процессами Python. Для отправки basic_ack из другого процесса Python.

Как поделиться BlockingChannel между несколькими процессами Python.

Ниже приведен код:

self.__connection__ = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.__channel__ = self.__connection__.channel()

Я пытался создать дамп, используя pickle, но он не позволяет сбросить канал и выдать ошибку can't pickle select.epoll objects используя следующий код

filepath = "temp/" + "merger_channel.sav"
pickle.dump(self.__channel__, open(filepath, 'wb'))

ЗАДАЧА:

Цель - отправить basic_ack из канала из других процессов python.

1 Ответ

0 голосов
/ 17 января 2019

Это антипаттерн для разделения канала между несколькими потоками, и маловероятно, что вам удастся разделить его между процессами.

Практическое правило: 1 connection на процесс и 1 channel наthread.

Более подробную информацию по этому вопросу можно прочитать по следующим ссылкам:

  1. 13 распространенных ошибок RabbitMQ
  2. Лучшие практики RabbitMQ
  3. Этот поток SO дает углубленный анализ в отношении RabbitMQ и одновременного потребления

Если вы хотите объединить потребление сообщений вместеВ случае многопроцессорной обработки обычный шаблон позволяет основным процессам получать сообщения, доставлять их полезные данные в пул рабочих процессов и подтверждать их, как только они будут выполнены.

Простой пример с использованием pika.BlockingChannel и concurrent.futures.ProcessPoolExecutor:

def ack_message(channel, delivery_tag, _future):
    """Called once the message has been processed.
    Acknowledge the message to RabbitMQ.
    """
    channel.basic_ack(delivery_tag=delivery_tag)

for message in channel.consume(queue='example'):
    method, properties, body = message

    future = pool.submit(process_message, body)
    # use partial to pass channel and ack_tag to callback function
    ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag)
    future.add_done_callback(ack_message_callback)      

Приведенный выше цикл будет бесконечно потреблять сообщения из очереди example и отправлять их в пул процессов.Вы можете контролировать, сколько сообщений обрабатывать одновременно, используя параметр RabbitMQ consumer prefetch .Проверьте pika.basic_qos, чтобы увидеть, как это сделать в Python.

...