Какой лучший шаблон для разработки асинхронного RPC-приложения с использованием Python, Pika и AMQP? - PullRequest
9 голосов
/ 13 сентября 2011

Модуль продюсера моего приложения запускается пользователями, которые хотят отправить работу на небольшой кластер. Он отправляет подписки в JSON-форме через посредник сообщений RabbitMQ.

Я испробовал несколько стратегий, и лучшая на данный момент - следующая, которая до сих пор не работает полностью:

На каждом кластерном компьютере запускается модуль-потребитель, который подписывается на очередь AMQP и выдает prefetch_count , чтобы сообщить брокеру, сколько задач он может запустить одновременно.

Мне удалось заставить его работать, используя SelectConnection из библиотеки Pika AMQP. И потребитель, и производитель запускают два канала, один из которых подключен к каждой очереди. Производитель отправляет запросы по каналу [A] и ожидает ответов в канале [B], а потребитель ожидает запросов по каналу [A] и отправляет ответы по каналу [B]. Однако кажется, что когда потребитель запускает обратный вызов, который вычисляет ответ, он блокируется, поэтому у меня каждый раз выполняется только одна задача для каждого потребителя.

Что мне нужно в итоге:

  1. потребитель [A] подписывает свои задачи (около 5k каждый раз) на кластер
  2. брокер отправляет N сообщений / запросов каждому потребителю, где N - количество одновременных задач, которые он может обработать
  3. когда одна задача завершена, потребитель отвечает брокеру / производителю с результатом
  4. производитель получает ответы, обновляет статус вычислений и, в конце концов, печатает некоторые отчеты

Ограничения:

  • Если другой пользователь отправляет работу, все его задачи будут поставлены в очередь после предыдущего пользователя (я полагаю, это автоматически выполняется в системе очередей, но я не думал о последствиях для многопоточной среды)
  • Задачи должны быть отправлены, но порядок ответа на них не важен

UPDATE

Я изучил немного дальше, и моя настоящая проблема заключается в том, что я использую простую функцию в качестве обратного вызова для функции SelectConnection.channel.basic_consume () pika. Моя последняя (нереализованная) идея - передать потоковую функцию вместо обычной, чтобы обратный вызов не блокировался, а потребитель мог продолжать слушать.

Ответы [ 3 ]

3 голосов
/ 08 апреля 2013

Как вы заметили, ваш процесс блокируется при запуске обратного вызова.Есть несколько способов справиться с этим в зависимости от того, что делает ваш обратный вызов.

Если ваш обратный вызов связан с вводом-выводом (при выполнении большого количества сетевых или дисковых операций ввода-вывода), вы можете использовать либо потоки, либо решение на основе гринлета, напримеркак gevent , eventlet или теплица .Имейте в виду, однако, что Python ограничен GIL (Global Interpreter Lock), что означает, что только один кусок кода Python когда-либо выполняется в одном процессе Python.Это означает, что если вы выполняете много вычислений с использованием кода Python, эти решения, скорее всего, будут не намного быстрее, чем у вас уже есть.

Другой вариант - реализовать вашего потребителя как несколько процессов с использованием многопроцессорной обработки ..Я обнаружил, что многопроцессорная обработка очень полезна при параллельной работе.Вы можете реализовать это либо с помощью Queue , с родительским процессом, который будет потребителем, и отрабатывать работу для своих детей, либо просто запустив несколько процессов, каждый из которых потребляет самостоятельно.Я бы посоветовал, если ваше приложение не является слишком параллельным (тысячи рабочих), просто запустить несколько рабочих, каждый из которых потребляет из своего собственного соединения.Таким образом, вы можете использовать функцию подтверждения AMQP, поэтому, если потребитель умирает во время обработки задачи, сообщение автоматически отправляется обратно в очередь и принимается другим работником, а не просто теряет запрос.

Последний вариант, если вы контролируете производителя и он также написан на Python, - это использовать библиотеку задач, такую ​​как celery , чтобы абстрагировать работу задачи / очереди для вас.Я использовал сельдерей в нескольких крупных проектах и ​​нашел, что он очень хорошо написан.Он также будет обрабатывать многочисленные потребительские проблемы для вас с соответствующей конфигурацией.

0 голосов
/ 06 марта 2013

Будучи неопытным в многопоточности, моя установка будет запускать несколько пользовательских процессов (число которых в основном равно количеству предварительных выборок). Каждый из них подключался к двум очередям и с удовольствием обрабатывал задания, не зная о существовании друг друга.

0 голосов
/ 08 апреля 2012

Ваша настройка звучит хорошо для меня.И вы правы, вы можете просто установить обратный вызов, чтобы запустить поток, и связать его с отдельным обратным вызовом, когда поток завершит очередь ответа обратно по каналу B.

По сути, ваши потребители должны иметь очередьих собственные (размер N, количество параллелизма они поддерживают).Когда запрос поступает через канал A, он должен сохранить результат в очереди, общей для основного потока с Pika и рабочих потоков в пуле потоков.Как только он будет поставлен в очередь, pika должна ответить ACK, и ваш рабочий поток проснется и начнет обработку.

Как только рабочий завершит свою работу, он вернет результат в отдельную очередь результатов.и выполните обратный вызов в основной поток, чтобы отправить его обратно потребителю.

Вы должны позаботиться и убедиться, что рабочие потоки не мешают друг другу, если они используют какие-либо общие ресурсы, но этоотдельная тема.

...