Это антипаттерн для разделения канала между несколькими потоками, и маловероятно, что вам удастся разделить его между процессами.
Практическое правило: 1 connection
на процесс и 1 channel
наthread.
Более подробную информацию по этому вопросу можно прочитать по следующим ссылкам:
- 13 распространенных ошибок RabbitMQ
- Лучшие практики RabbitMQ
- Этот поток SO дает углубленный анализ в отношении RabbitMQ и одновременного потребления
Если вы хотите объединить потребление сообщений вместеВ случае многопроцессорной обработки обычный шаблон позволяет основным процессам получать сообщения, доставлять их полезные данные в пул рабочих процессов и подтверждать их, как только они будут выполнены.
Простой пример с использованием pika.BlockingChannel
и concurrent.futures.ProcessPoolExecutor
:
def ack_message(channel, delivery_tag, _future):
"""Called once the message has been processed.
Acknowledge the message to RabbitMQ.
"""
channel.basic_ack(delivery_tag=delivery_tag)
for message in channel.consume(queue='example'):
method, properties, body = message
future = pool.submit(process_message, body)
# use partial to pass channel and ack_tag to callback function
ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag)
future.add_done_callback(ack_message_callback)
Приведенный выше цикл будет бесконечно потреблять сообщения из очереди example
и отправлять их в пул процессов.Вы можете контролировать, сколько сообщений обрабатывать одновременно, используя параметр RabbitMQ consumer prefetch .Проверьте pika.basic_qos
, чтобы увидеть, как это сделать в Python.