Может ли eventlet управлять соединением AMQP с асинхронной передачей сообщений как внутри, так и снаружи? - PullRequest
8 голосов
/ 02 ноября 2011

Фактический дизайн:

Для тех, кто возвращается к этому вопросу, полезный ответ ниже подтолкнул меня к работоспособному дизайну, который работает нормально. Три идеи были ключевыми:

  1. Eventlet - это очень безопасная среда - если два гринлета пытаются по recv() или оба пытаются send() из одного и того же сокета одновременно, то Eventlet элегантно убивает второй гринлет с исключением. Это великолепно и означает, что простые исключения, а не невозможные для воспроизведения ошибки чередования данных, будут иметь место, если amqplib «зеленеет» плохо.
  2. Методы amqplib делятся примерно на две группы: wait() зацикливается внутри recv() до тех пор, пока не будет собрано сообщение AMQP, в то время как другие методы send() возвращают сообщения и не будут пытаться выполнить свои собственные recv(). Это потрясающе удача, учитывая, что авторы amqplib даже не подозревали, что кто-то попытается «озеленить» свою библиотеку! Это означает, что отправка сообщений не только защищена от обратного вызова, вызванного wait(), но и то, что сообщения могут также безопасно отправляться из других гринлетов, которые полностью находятся вне контроля цикла wait(). Эти безопасные методы, которые можно вызывать из любого гринлета, а не только из обратного вызова wait():
    1. basic_ack
    2. basic_consume с nowait=True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declare с nowait=True
    7. exchange_delete с nowait=True
    8. queue_bind с nowait=True
    9. queue_unbind с nowait=True
    10. queue_declare с nowait=True
    11. queue_delete с nowait=True
    12. queue_purge с nowait=True
  3. Семафоры могут использоваться в качестве блокировок: инициализируйте семафор с количеством 1, а затем acquire() и release() для блокировки и разблокировки. Все мои асинхронные гринлеты, которые хотят писать сообщения, могут использовать такую ​​блокировку, чтобы их отдельные вызовы send() не чередовались и не разрушали протокол AMQP.

Итак, мой код выглядит примерно так:

amqp = eventlet.patcher.import_patched('amqplib.client_0_8')

class Processor(object):
    def __init__(self):
        write_lock = eventlet.semaphore.Semaphore(1)

    def listening_greenlet(channel):
        # start this using eventlet.spawn_n()
        # create Connection and self.channel
        self.channel.basic_consume(queue, callback=self.consume)
        while True:
            self.channel.wait()

    def safe_publish(channel, *args, **kw):
        with write_lock:  # yes, Eventlet supports this!
            channel.basic_publish(*args, **kw)     

    def consume(message):
        # Returning immediately frees the wait() loop
        eventlet.spawn_n(self.process, message)

    def process(message):
        # do whatever I want
        # whenever I am done, I can async reply:
        self.safe_publish(...)

Наслаждайтесь!

Оригинальный вопрос:

Представьте себе сотни сообщений AMQP, поступающих каждую минуту в маленькое приложение Python Eventlet , каждое из которых необходимо обработать и получить ответ - где загрузка ЦП обработки будет минимальной, но может потребовать ожидания ответов от других сервисов и розеток.

Чтобы позволить, скажем, 100 сообщениям обрабатываться одновременно, я, конечно, мог бы развернуть 100 отдельных TCP-соединений с RabbitMQ и иметь работника для каждого соединения, которое получает, обрабатывает и отвечает на отдельные сообщения в режиме блокировки. Но для сохранения TCP-соединений я бы предпочел создать только одно AMQP-соединение, позволить RabbitMQ на полной скорости передавать мне сообщения по конвейеру, передавать эти задачи работникам и отправлять ответы обратно по завершении каждого работника:

                                       +--------+
                                +------| worker | <-+
                                |      +--------+   |
                                |      +--------+   |
                                | +----| worker | <-+
                                | |    +--------+   |
                                | |    +--------+   |
                                | | +--| worker | <-+
                                | | |  +--------+   |
                                v v v               |
                           +------------+           |
 RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
                           +------------+

Обратите внимание:

  • Очередь Eventlet может элегантно распределять входящую работу среди рабочих, когда они становятся доступными для дополнительной работы.
  • Возможно даже управление потоком данных из RabbitMQ: я могу получать сообщения ACK только до тех пор, пока все мои сотрудники не будут заняты, а затем ждать, прежде чем отправлять дополнительные ACK, пока очередь не начнет опустошаться.
  • Работа почти наверняка будет выполнена не по порядку: один запрос может закончиться быстро, в то время как другое событие, пришедшее ранее, займет гораздо больше времени; и некоторые запросы могут никогда не завершиться вообще; поэтому рабочие будут возвращать ответы в непредсказуемом и асинхронном порядке.

Я планировал написать это с помощью Eventlet и py-amqplib , увидев этот привлекательный пост в блоге о том, как легко эту библиотеку AMQP можно вставить в модель обработки Eventlet:

http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/

Моя проблемаявляется то, что, прочитав документацию для обеих библиотек, исходный код amqplib и большую часть исходного кода Eventlet, я не могу понять, как я могу научить eventlet, которому принадлежит соединение AMQP - eventlet с именем connect_to_host() в посте блога - до также просыпаться, когда работник завершает свою работу и генерирует ответ. Метод wait() в amqplib можно активировать только через активность на сокете AMQP. Хотя мне кажется, что я должен быть в состоянии заставить рабочих записывать свои ответы в очередь, и connect_to_host() eventlet просыпается либо , когда приходит новое входящее сообщение или , когда работник готов ответить на вопрос, и я не могу найти в журнале событий никакого способа сказать: «разбуди меня, когда или из этих событий произойдет».

Мне пришло в голову, что рабочие могут попытаться управлять объектом соединения AMQP - или даже необработанным сокетом - и записывать свои собственные сообщения через TCP; но кажется, что блокировки были бы необходимы для предотвращения чередования исходящих рабочих сообщений друг с другом или с сообщениями ACK, записанными в основном приемнике событий слушателя, и я также не могу найти, где блокировки доступны в Eventlet.

Все это заставляет меня чувствовать себя почти уверенным, что я пытаюсь решить эту проблему как-то точно в обратном направлении. Неужели проблема, подобная этой - разрешить безопасное совместное использование одного соединения между слушателем-диспетчером и многими работниками, - просто не отображается на модель сопрограммы и требует полноценной асинхронной библиотеки? (В этом случае: есть ли один, который вы бы порекомендовали для этой проблемы, и как будет происходить мультиплексирование между входящими сообщениями и исходящими рабочими ответами? Сегодня я не нашел чистого решения, пробующего комбинации, такие как Pika + ioloop - хотя я только что видел другой библиотека, stormed_amqp , которая может работать лучше, чем Пика.) Или мне действительно нужно вернуться к реальным живым потокам Python, если я хочу чистый и поддерживаемый код, который может задействовать эту модель? Я открыт для всех вариантов.

Спасибо за любую помощь или идеи! Я продолжаю думать, что у меня почти полностью отсутствует параллелизм в Python, и потом я снова узнаю, что это не так. :) И я надеюсь, что вам понравился рисунок ASCII выше в любом случае.

1 Ответ

5 голосов
/ 02 ноября 2011

После прочтения вашего поста и работы с gevent, похожим на библиотеку, похожую на eventlet, мне стало ясно несколько вещей, потому что я просто решил похожую проблему

Как правило, блокировка не требуется, поскольку когда-либо работает только один eventlet или гринлет, пока ни один из них не блокирует все, кажется, работает одновременно .. НО вы не хотите отправлять данные вниз сокет, пока другой гринлет отправляет. Вы правы и действительно нуждаетесь в блокировке для этого.

Если у меня есть вопросы, подобные этим, то просмотра в документации недостаточно ... зайдите в источник! в любом случае, с открытым исходным кодом вы узнаете намного больше, глядя на код других людей.

вот несколько упрощенных примеров кода, которые могут прояснить для вас.

в вашем диспетчере есть 2 очереди

self.worker_queue = Queue() # queue for messages to workers
self.server_queue = Queue() # queue for messages to ampq server

рабочие должны поместить свой результат в очередь на сервере.

код отправки и получения

def send_into_ampq():
    while True:
       message = dispatcher.get_workger_msg()

       try:
          connection.send(self.encode(message))
       except:
           connection.kill()

def read_from_ampq():
    while True:
        message = connection.wait()

        dispatcher.put_ampq_msg(self.decode(message))

в функции отправки кода вашего соединения

self._writelock = Semaphore(1) 
# this is a gevent locking thing. eventlet must have something like this too..
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until 
# 0 agian.... why not google it i though.. and yes im right:
# eventlet.semaphore.Semaphore(value=1)

def send(self, message):
    """
    you need a write lock to prevent more greenlets
    sending more messages when previous sent is not done yet.
    """

    with self._writelock:
        self.socket.sendall(message)
...