сельдерей - вызов функции по выполнению задачи - PullRequest
11 голосов
/ 06 марта 2012

Я использую сельдерей с django и rabbitmq для создания очереди сообщений.У меня также есть работник, который происходит с другой машины.В представлении django я запускаю процесс, подобный следующему:

def processtask(request, name):
  args = ["ls", "-l"]
  MyTask.delay(args)
  return HttpResponse("Task set to execute.")

Моя задача настроена так:

class MyTask(Task):
  def run(self, args):
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (out, err) = p.communicate()
    return out

Теперь у меня вопрос: как может быть брокер (мой проект django?) теперь получает вывод команды "ls -l", которую работник выполнил на своем компьютере.Я полагаю, что для работника лучше всего было бы вызывать функцию в брокере всякий раз, когда она готова отправить вывод от выполненной команды.

Я хотел бы получить выходные данные от работника асинхронно, а затем обновить веб-страницу с выходными данными, но это в другой раз.На данный момент я хотел бы только получить вывод от работника.

Обновление

Сейчас я добавил HTTP GET-запрос, который запускается в конце задачи, уведомляя веб-приложение о том, что задача выполнена - ятакже отправка task_id в http GET.Метод http GET вызывает представление django, которое создает AsyncResult и получает результат, но проблема в том, что при вызове result.get () я получаю следующую ошибку:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
  "Polling results with transaction isolation level"

Anyидеи почему?Я не использую базу данных, потому что я использую rabbitmq с AMQP.

Update.

Я бы очень хотел использовать третий вариант, который кажется лучшим вариантом - для маленьких ибольшие возвращаемые значения.Вся моя задача выглядит следующим образом:

class MyTask(Task):
  def __call__(self, *args, **kwargs):
    return self.run(*args, **kwargs)

  def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.webhost is not None:
      conn = httplib.HTTPConnection(self.webhost, self.webport)
      conn.request("HEAD", "/vuln/task/output/"+task_id)

  def run(self, args, webhost=None, webport=None):
    self.webhost = webhost
    self.webport = webport
    r = "This is a basic result string used for code clarity"
    return r

Итак, я переопределил функцию after_return, которая также должна снять блокировку моей задачи, поскольку функция run () задачи уже вернула значение.В запросе HEAD я в основном вызываю функцию django, которая вызывает AsyncResult для task_id, которая должна предоставлять результат задачи.В моем случае я использовал произвольный результат для целей тестирования, поскольку он предназначен только для тестирования.

Я хотел бы знать, почему вышеприведенный код не работает.Я могу использовать on_success, но я не думаю, что это будет иметь значение - или это будет?

1 Ответ

15 голосов
/ 07 марта 2012

Если вы посмотрите здесь , вы найдете следующее:

Django-celery использует MySQL для отслеживания всех задач / результатов, а rabbit-mq в основном используется в качестве коммуникационной шины.

На самом деле происходит то, что вы пытаетесь получить ASyncResult работника, пока задача еще выполняется (задача вызвала HTTP-запрос к вашему серверу, и, поскольку она еще не вернулась, сеанс блокировки БД от работника все еще активен, а строка результатов все еще заблокирована). Когда Django пытается прочитать результат задачи (ее состояние и фактическое возвращаемое значение функции run), он находит строку заблокированной и выдает предупреждение.

Есть несколько способов решить эту проблему:

  1. Настройте еще одну задачу сельдерея, чтобы получить результат и связать его с задачей обработки. Таким образом, оригинальное задание завершится, снимите блокировку на БД, и новое получит его, прочитает результат в django и сделает все, что вам нужно. Посмотрите документы по сельдерею на этом.

  2. Не беспокойтесь, просто сделайте POST для Django с полным результатом обработки, прикрепленным как полезная нагрузка, вместо того, чтобы пытаться получить его через db.

  3. Переопределите on_success в вашем классе задач и отправьте запрос на уведомление Django, после чего блокировка должна быть снята с таблицы db.

Обратите внимание, что вам нужно сохранить весь результат обработки (независимо от того, насколько он велик) в возвращаемом методе выполнения (возможно, зарезанном) Вы не упомянули, насколько большим может быть результат, поэтому может иметь смысл на самом деле просто выполнить сценарий № 2 выше (что я и сделал бы). В качестве альтернативы я бы пошел с # 3. Также не забудьте обработать метод on_failure в своей задаче.

...