Я использую сельдерей с django и rabbitmq для создания очереди сообщений.У меня также есть работник, который происходит с другой машины.В представлении django я запускаю процесс, подобный следующему:
def processtask(request, name):
args = ["ls", "-l"]
MyTask.delay(args)
return HttpResponse("Task set to execute.")
Моя задача настроена так:
class MyTask(Task):
def run(self, args):
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
return out
Теперь у меня вопрос: как может быть брокер (мой проект django?) теперь получает вывод команды "ls -l", которую работник выполнил на своем компьютере.Я полагаю, что для работника лучше всего было бы вызывать функцию в брокере всякий раз, когда она готова отправить вывод от выполненной команды.
Я хотел бы получить выходные данные от работника асинхронно, а затем обновить веб-страницу с выходными данными, но это в другой раз.На данный момент я хотел бы только получить вывод от работника.
Обновление
Сейчас я добавил HTTP GET-запрос, который запускается в конце задачи, уведомляя веб-приложение о том, что задача выполнена - ятакже отправка task_id в http GET.Метод http GET вызывает представление django, которое создает AsyncResult и получает результат, но проблема в том, что при вызове result.get () я получаю следующую ошибку:
/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
"Polling results with transaction isolation level"
Anyидеи почему?Я не использую базу данных, потому что я использую rabbitmq с AMQP.
Update.
Я бы очень хотел использовать третий вариант, который кажется лучшим вариантом - для маленьких ибольшие возвращаемые значения.Вся моя задача выглядит следующим образом:
class MyTask(Task):
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self.webhost is not None:
conn = httplib.HTTPConnection(self.webhost, self.webport)
conn.request("HEAD", "/vuln/task/output/"+task_id)
def run(self, args, webhost=None, webport=None):
self.webhost = webhost
self.webport = webport
r = "This is a basic result string used for code clarity"
return r
Итак, я переопределил функцию after_return, которая также должна снять блокировку моей задачи, поскольку функция run () задачи уже вернула значение.В запросе HEAD я в основном вызываю функцию django, которая вызывает AsyncResult для task_id, которая должна предоставлять результат задачи.В моем случае я использовал произвольный результат для целей тестирования, поскольку он предназначен только для тестирования.
Я хотел бы знать, почему вышеприведенный код не работает.Я могу использовать on_success, но я не думаю, что это будет иметь значение - или это будет?