Использование сельдерея в качестве канала управления для витых приложений - PullRequest
11 голосов
/ 15 ноября 2011

Я пытаюсь использовать Celery в качестве канала управления для приложения Twisted. Приложение My Twisted - это уровень абстракции, который обеспечивает стандартный интерфейс для различных локально запущенных процессов (через ProcessProtocol). Я хотел бы использовать Celery для удаленного управления - AMQP кажется идеальным методом управления многими приложениями Twisted из центрального местоположения, и я хотел бы воспользоваться преимуществами функций Celery, основанных на задачах, например, повторные задания, подзадачи и т. д.

Это не работает, как я планировал, и я надеюсь, что кто-то может помочь мне указать правильное направление, чтобы заставить это работать.

Поведение, которого я пытаюсь достичь при запуске сценария:

  • Запустите слегка модифицированный сельдерей (см. ниже)
  • Ожидание заданий из сельдерея
  • Когда получена задача 'start process', порождает ProcessProtocol
  • Когда получены другие задачи, запустите функцию по протоколу Twisted и верните результат, используя Deferreds

Слегка модифицированный celeryd - это celeryd с небольшой модификацией, которая позволяет задачам получать доступ к реактору Twisted через self.app.twisted, а порожденный процесс - через self.app.process. Для простоты я использую имплантацию пула процессов Celery 'соло', которая не создает новый процесс для рабочих задач.

Моя проблема возникает, когда я пытаюсь использовать задачу Celery для инициализации ProcessProtocol (т. Е. Запуска внешнего процесса). Процесс запускается правильно, но childDataReceived ProcessProtocol никогда не вызывается. Я думаю, что это связано с тем, что файловые дескрипторы не наследуются / устанавливаются правильно.

Ниже приведен пример кода, основанный на примере 'wc' в документации ProcessProtocol. Он включает в себя две задачи Celery - одну для запуска процесса wc, а другую для подсчета слов в некотором тексте (используя ранее запущенный процесс wc).

Этот пример довольно надуманный, но если я смогу заставить это работать, он послужит хорошей отправной точкой для реализации моих ProcessProtocols, которые являются длительными процессами, которые будут отвечать на команды, записанные в stdin.

Я проверяю это, сначала запустив демон Celery:

python2.6 mycelery.py -l info -P соло

Затем в другом окне запускается скрипт, который отправляет две задачи:

python2.6 command_test.py

Ожидаемое поведение command_test.py для выполнения двух команд - одна запускает процесс wc, а другая отправляет некоторый текст в CountWordsTask. Что на самом деле происходит:

  • StartProcTask запускает процесс и получает «процесс запущен» в качестве ответа через Deffered
  • CountWordsTask никогда не получает результат, потому что childDataReceived никогда не вызывается

Может кто-нибудь пролить свет на это или дать совет, как лучше всего использовать Celery в качестве канала управления для Twisted ProcessProtocols?

Было бы лучше написать реализацию ProcessPool с поддержкой Twisted для Celery? Является ли мой метод вызова WorkerCommand.execute_from_commandline через реактор.callLater правильным подходом для обеспечения того, чтобы все происходило внутри витой нити?

Я читал об AMPoule, который, я думаю, мог бы предоставить некоторые из этих функций, но хотел бы придерживаться Celery, если это возможно, поскольку я использую его в других частях моего приложения.

Буду признателен за любую помощь или помощь!

myceleryd.py

from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor


class MyCeleryApp(App):
    def __init__(self, twisted, *args, **kwargs):
        self.twisted = twisted
        super(MyCeleryApp, self).__init__(*args, **kwargs)

def main():
    get_my_app = partial(MyCeleryApp, reactor)
    worker = WorkerCommand(get_app=get_my_app)
    reactor.callLater(1, worker.execute_from_commandline)
    reactor.run()

if __name__ == '__main__':
    main()

protocol.py

from twisted.internet import protocol
from twisted.internet.defer import Deferred

class WCProcessProtocol(protocol.ProcessProtocol):

    def __init__(self, text):
        self.text = text
        self._waiting = {} # Dict to contain deferreds, keyed by command name

    def connectionMade(self):
        if 'startup' in self._waiting:
            self._waiting['startup'].callback('process started')

    def outReceived(self, data):
        fieldLength = len(data) / 3
        lines = int(data[:fieldLength])
        words = int(data[fieldLength:fieldLength*2])
        chars = int(data[fieldLength*2:])
        self.transport.loseConnection()
        self.receiveCounts(lines, words, chars)

        if 'countWords' in self._waiting:
            self._waiting['countWords'].callback(words)

    def processExited(self, status):
        print 'exiting'


    def receiveCounts(self, lines, words, chars):
        print >> sys.stderr, 'Received counts from wc.'
        print >> sys.stderr, 'Lines:', lines
        print >> sys.stderr, 'Words:', words
        print >> sys.stderr, 'Characters:', chars

    def countWords(self, text):
        self._waiting['countWords'] = Deferred()
        self.transport.write(text)
        return self._waiting['countWords']

tasks.py

from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor

class StartProcTask(Task):
    def run(self):
        self.app.proc = WCProcessProtocol('testing')
        self.app.proc._waiting['startup'] = Deferred()
        self.app.twisted.spawnProcess(self.app.proc,
                                      'wc',
                                      ['wc'],
                                      usePTY=True)
        return self.app.proc._waiting['startup']

class CountWordsTask(Task):
    def run(self):
        return self.app.proc.countWords('test test')

1 Ответ

11 голосов
/ 15 ноября 2011

Сельдерей, вероятно, блокируется во время ожидания новых сообщений из сети. Поскольку вы запускаете его в однопоточном процессе вместе с витым реактором, он блокирует работу реактора. Это отключит большую часть Twisted, который требует, чтобы реактор действительно работал (вы назвали reactor.run, но из-за того, что Celery блокирует его, он фактически не работает).

reactor.callLater только задерживает запуск Celery. Как только Celery запускается, он все еще блокирует реактор.

Проблема, которую вам нужно избегать, - это блокировка реактора.

Одним из решений было бы запустить сельдерей в одном потоке, а реактор - в другом. Используйте reactor.callFromThread для отправки сообщений в Twisted («вызов функций в потоке реактора») из потока Celery. Используйте эквивалент Celery, если вам нужно отправлять сообщения обратно в Celery из витой ветки.

Другим решением было бы реализовать протокол Celery (AMQP? - см. txAMQP ) в качестве встроенной библиотеки Twisted и использовать ее для обработки сообщений Celery без блокировки.

...