Фактический дизайн:
Для тех, кто возвращается к этому вопросу, полезный ответ ниже подтолкнул меня к работоспособному дизайну, который работает нормально. Три идеи были ключевыми:
- Eventlet - это очень безопасная среда - если два гринлета пытаются по
recv()
или оба пытаются send()
из одного и того же сокета одновременно, то Eventlet элегантно убивает второй гринлет с исключением. Это великолепно и означает, что простые исключения, а не невозможные для воспроизведения ошибки чередования данных, будут иметь место, если amqplib
«зеленеет» плохо.
- Методы
amqplib
делятся примерно на две группы: wait()
зацикливается внутри recv()
до тех пор, пока не будет собрано сообщение AMQP, в то время как другие методы send()
возвращают сообщения и не будут пытаться выполнить свои собственные recv()
. Это потрясающе удача, учитывая, что авторы amqplib
даже не подозревали, что кто-то попытается «озеленить» свою библиотеку! Это означает, что отправка сообщений не только защищена от обратного вызова, вызванного wait()
, но и то, что сообщения могут также безопасно отправляться из других гринлетов, которые полностью находятся вне контроля цикла wait()
. Эти безопасные методы, которые можно вызывать из любого гринлета, а не только из обратного вызова wait()
:
basic_ack
basic_consume
с nowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
с nowait=True
exchange_delete
с nowait=True
queue_bind
с nowait=True
queue_unbind
с nowait=True
queue_declare
с nowait=True
queue_delete
с nowait=True
queue_purge
с nowait=True
- Семафоры могут использоваться в качестве блокировок: инициализируйте семафор с количеством
1
, а затем acquire()
и release()
для блокировки и разблокировки. Все мои асинхронные гринлеты, которые хотят писать сообщения, могут использовать такую блокировку, чтобы их отдельные вызовы send()
не чередовались и не разрушали протокол AMQP.
Итак, мой код выглядит примерно так:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
Наслаждайтесь!
Оригинальный вопрос:
Представьте себе сотни сообщений AMQP, поступающих каждую минуту в маленькое приложение Python Eventlet , каждое из которых необходимо обработать и получить ответ - где загрузка ЦП обработки будет минимальной, но может потребовать ожидания ответов от других сервисов и розеток.
Чтобы позволить, скажем, 100 сообщениям обрабатываться одновременно, я, конечно, мог бы развернуть 100 отдельных TCP-соединений с RabbitMQ и иметь работника для каждого соединения, которое получает, обрабатывает и отвечает на отдельные сообщения в режиме блокировки. Но для сохранения TCP-соединений я бы предпочел создать только одно AMQP-соединение, позволить RabbitMQ на полной скорости передавать мне сообщения по конвейеру, передавать эти задачи работникам и отправлять ответы обратно по завершении каждого работника:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
Обратите внимание:
- Очередь Eventlet может элегантно распределять входящую работу среди рабочих, когда они становятся доступными для дополнительной работы.
- Возможно даже управление потоком данных из RabbitMQ: я могу получать сообщения ACK только до тех пор, пока все мои сотрудники не будут заняты, а затем ждать, прежде чем отправлять дополнительные ACK, пока очередь не начнет опустошаться.
- Работа почти наверняка будет выполнена не по порядку: один запрос может закончиться быстро, в то время как другое событие, пришедшее ранее, займет гораздо больше времени; и некоторые запросы могут никогда не завершиться вообще; поэтому рабочие будут возвращать ответы в непредсказуемом и асинхронном порядке.
Я планировал написать это с помощью Eventlet и py-amqplib , увидев этот привлекательный пост в блоге о том, как легко эту библиотеку AMQP можно вставить в модель обработки Eventlet:
http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
Моя проблемаявляется то, что, прочитав документацию для обеих библиотек, исходный код amqplib и большую часть исходного кода Eventlet, я не могу понять, как я могу научить eventlet, которому принадлежит соединение AMQP - eventlet с именем connect_to_host()
в посте блога - до также просыпаться, когда работник завершает свою работу и генерирует ответ. Метод wait()
в amqplib можно активировать только через активность на сокете AMQP. Хотя мне кажется, что я должен быть в состоянии заставить рабочих записывать свои ответы в очередь, и connect_to_host()
eventlet просыпается либо , когда приходит новое входящее сообщение или , когда работник готов ответить на вопрос, и я не могу найти в журнале событий никакого способа сказать: «разбуди меня, когда или из этих событий произойдет».
Мне пришло в голову, что рабочие могут попытаться управлять объектом соединения AMQP - или даже необработанным сокетом - и записывать свои собственные сообщения через TCP; но кажется, что блокировки были бы необходимы для предотвращения чередования исходящих рабочих сообщений друг с другом или с сообщениями ACK, записанными в основном приемнике событий слушателя, и я также не могу найти, где блокировки доступны в Eventlet.
Все это заставляет меня чувствовать себя почти уверенным, что я пытаюсь решить эту проблему как-то точно в обратном направлении. Неужели проблема, подобная этой - разрешить безопасное совместное использование одного соединения между слушателем-диспетчером и многими работниками, - просто не отображается на модель сопрограммы и требует полноценной асинхронной библиотеки? (В этом случае: есть ли один, который вы бы порекомендовали для этой проблемы, и как будет происходить мультиплексирование между входящими сообщениями и исходящими рабочими ответами? Сегодня я не нашел чистого решения, пробующего комбинации, такие как Pika + ioloop - хотя я только что видел другой библиотека, stormed_amqp , которая может работать лучше, чем Пика.) Или мне действительно нужно вернуться к реальным живым потокам Python, если я хочу чистый и поддерживаемый код, который может задействовать эту модель? Я открыт для всех вариантов.
Спасибо за любую помощь или идеи! Я продолжаю думать, что у меня почти полностью отсутствует параллелизм в Python, и потом я снова узнаю, что это не так. :) И я надеюсь, что вам понравился рисунок ASCII выше в любом случае.