Узнайте, существует ли задача сельдерея - PullRequest
40 голосов
/ 22 марта 2012

Можно ли узнать, существует ли задача с определенным идентификатором задачи? Когда я пытаюсь получить статус, я всегда получаю ожидание.

>>> AsyncResult('...').status
'PENDING'

Я хочу знать, является ли данный идентификатор задачи реальным идентификатором задачи сельдерея, а не случайной строкой. Я хочу получить разные результаты в зависимости от того, существует ли правильная задача для определенного идентификатора.

Возможно, в прошлом была допустимая задача с тем же идентификатором, но результаты могли быть удалены из серверной части.

Ответы [ 6 ]

29 голосов
/ 10 апреля 2012

Celery не записывает состояние при отправке задачи, это частично оптимизация (см. http://docs.celeryproject.org/en/latest/userguide/tasks.html#state).

Если вам это действительно нужно, просто добавить:

from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish

@after_task_publish.connect
def update_sent_state(sender=None, body=None, **kwargs):
    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend

    backend.store_result(body['id'], None, "SENT")

Затем вы можете проверить состояние PENDING, чтобы обнаружить, что задача не была отправлена ​​(казалось бы):

>>> result.state != "PENDING"
9 голосов
/ 09 апреля 2012

AsyncResult.state возвращает PENDING в случае неизвестных идентификаторов задачи.

В ОЖИДАНИИ

Задача ожидает выполнения или неизвестна. Любой идентификатор задачи, который не является известное подразумевается в состоянии ожидания.

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

Вы можете предоставить собственные идентификаторы задач, если вам нужно отличить неизвестные идентификаторы от существующих:

>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
... 
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
... 
2 голосов
/ 10 апреля 2012

Сейчас я использую следующую схему:

  1. Получить идентификатор задачи.
  2. Установить для ключа memcache, например, «task_% s»% task.id сообщение «Запущено».
  3. Передать идентификатор задачи клиенту.
  4. Теперь с клиента я могу отслеживать состояние задачи (устанавливается из сообщений задачи в memcache).
  5. Из задачи в состояние готовности - установите для ключевого сообщения memcache «Готово».
  6. С клиента на задачу готова - запустите специальную задачу, которая удалит ключ из memcache и сделает необходимые действия по очистке.
0 голосов
/ 07 апреля 2012

Пожалуйста, поправьте меня, если я ошибаюсь.

if built_in_status_check(task_id) == 'pending'
   if registry_exists(task_id) == true
      print 'Pending'
   else
      print 'Task does not exist'
0 голосов
/ 06 апреля 2012

Вам нужно вызвать .get() для объекта AsyncTask, который вы создаете, чтобы фактически получить результат из внутреннего интерфейса.

См. Celery FAQ .


Чтобы уточнить мой ответ.

Любая строка технически является допустимым идентификатором, нет способа проверить идентификатор задачи.Единственный способ выяснить, существует ли задача, это спросить у бэкэнда, знает ли она об этом, и для этого необходимо использовать .get().

. Это создает проблему, которую .get() блокирует, когда бэкэнд не работает.У вас нет никакой информации об указанном вами идентификаторе задачи, это сделано для того, чтобы вы могли запустить задачу и затем дождаться ее завершения.

В случае исходного вопроса я предполагаю, чтоОП хочет получить состояние ранее выполненной задачи.Для этого вы можете передать очень маленький тайм-аут и перехватить ошибки тайм-аута:

from celery.exceptions import TimeoutError
try:
    # fetch the result from the backend
    # your backend must be fast enough to return
    # results within 100ms (0.1 seconds)
    result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
    result = None

if result:
    print "Result exists; state=%s" % (result.state,)
else:
    print "Result does not exist"

Само собой разумеется, что это работает, только если ваш бэкэнд хранит результаты, если нет, то нет способа узнать, является лиИдентификатор задачи действителен или нет, потому что ничего не хранит их запись.


Еще больше разъяснений.

То, что вы хотите сделать, не может быть выполнено с помощью бэкэнда AMQP, потому что он не сохраняет результаты, он пересылает их .

Я бы предложил переключиться на серверную часть базы данных, чтобы результаты находились в базе данных, которую можно запрашивать за пределами существующих модулей сельдерея.Если в базе данных результатов нет задач, можно предположить, что идентификатор недействителен.

0 голосов
/ 22 марта 2012

Попробуйте

AsyncResult('blubb').state

, что может работать.

Должно возвращаться что-то другое.

...