Я пытаюсь использовать Celery в качестве канала управления для приложения Twisted. Приложение My Twisted - это уровень абстракции, который обеспечивает стандартный интерфейс для различных локально запущенных процессов (через ProcessProtocol). Я хотел бы использовать Celery для удаленного управления - AMQP кажется идеальным методом управления многими приложениями Twisted из центрального местоположения, и я хотел бы воспользоваться преимуществами функций Celery, основанных на задачах, например, повторные задания, подзадачи и т. д.
Это не работает, как я планировал, и я надеюсь, что кто-то может помочь мне указать правильное направление, чтобы заставить это работать.
Поведение, которого я пытаюсь достичь при запуске сценария:
- Запустите слегка модифицированный сельдерей (см.
ниже)
- Ожидание заданий из сельдерея
- Когда получена задача 'start process', порождает ProcessProtocol
- Когда получены другие задачи, запустите функцию по протоколу Twisted и верните результат, используя Deferreds
Слегка модифицированный celeryd - это celeryd с небольшой модификацией, которая позволяет задачам получать доступ к реактору Twisted через self.app.twisted, а порожденный процесс - через self.app.process. Для простоты я использую имплантацию пула процессов Celery 'соло', которая не создает новый процесс для рабочих задач.
Моя проблема возникает, когда я пытаюсь использовать задачу Celery для инициализации ProcessProtocol (т. Е. Запуска внешнего процесса). Процесс запускается правильно, но childDataReceived ProcessProtocol никогда не вызывается. Я думаю, что это связано с тем, что файловые дескрипторы не наследуются / устанавливаются правильно.
Ниже приведен пример кода, основанный на примере 'wc' в документации ProcessProtocol. Он включает в себя две задачи Celery - одну для запуска процесса wc, а другую для подсчета слов в некотором тексте (используя ранее запущенный процесс wc).
Этот пример довольно надуманный, но если я смогу заставить это работать, он послужит хорошей отправной точкой для реализации моих ProcessProtocols, которые являются длительными процессами, которые будут отвечать на команды, записанные в stdin.
Я проверяю это, сначала запустив демон Celery:
python2.6 mycelery.py -l info -P соло
Затем в другом окне запускается скрипт, который отправляет две задачи:
python2.6 command_test.py
Ожидаемое поведение command_test.py для выполнения двух команд - одна запускает процесс wc, а другая отправляет некоторый текст в CountWordsTask. Что на самом деле происходит:
- StartProcTask запускает процесс и получает «процесс запущен» в качестве ответа через Deffered
- CountWordsTask никогда не получает результат, потому что childDataReceived никогда не вызывается
Может кто-нибудь пролить свет на это или дать совет, как лучше всего использовать Celery в качестве канала управления для Twisted ProcessProtocols?
Было бы лучше написать реализацию ProcessPool с поддержкой Twisted для Celery? Является ли мой метод вызова WorkerCommand.execute_from_commandline через реактор.callLater правильным подходом для обеспечения того, чтобы все происходило внутри витой нити?
Я читал об AMPoule, который, я думаю, мог бы предоставить некоторые из этих функций, но хотел бы придерживаться Celery, если это возможно, поскольку я использую его в других частях моего приложения.
Буду признателен за любую помощь или помощь!
myceleryd.py
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if __name__ == '__main__':
main()
protocol.py
from twisted.internet import protocol
from twisted.internet.defer import Deferred
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name
def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)
def processExited(self, status):
print 'exiting'
def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars
def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']
tasks.py
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']
class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')