Как проверить статус задания в сельдерее? - PullRequest
83 голосов
/ 27 января 2012

Как можно проверить, выполняется ли задание в сельдерее (в частности, я использую celery-django)?

Я прочитал документацию и погуглил, но не могуувидеть вызов как:

my_example_task.state() == RUNNING

Мой пример использования - у меня есть внешний (java) сервис для транскодирования.Когда я отправляю документ для перекодирования, я хочу проверить, запущена ли задача, которая запускает этот сервис, и, если нет, (пере) запустить его.

Я использую текущие стабильные версии - 2.4Я верю.

Ответы [ 10 ]

81 голосов
/ 27 января 2012

Верните task_id (полученный из .delay ()) и спросите экземпляр сельдерея о состоянии:

x = method.delay(1,2)
print x.task_id

При запросе получите новый AsyncResult, используя этот task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
55 голосов
/ 28 января 2012

Каждый Task объект имеет свойство .request, в котором содержится AsyncRequest объект. Соответственно, следующая строка дает состояние Задачи task:

task.AsyncResult(task.request.id).state
53 голосов
/ 08 июля 2016

Создание объекта AsyncResult из идентификатора задачи - это способ, рекомендованный в FAQ для получения статуса задачи, когда у вас есть только идентификатор задачи.

Однако, начиная с Celery 3.x, существуют значительные предостережения, которые могут кусать людей, если они не обращают на них внимания. Это действительно зависит от конкретного сценария использования.

По умолчанию Celery не записывает «запущенное» состояние.

Для того, чтобы Celery мог записать, что задание выполняется, вы должны установить task_track_started на True. Вот простая задача, которая проверяет это:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Когда task_track_started равен False, что является значением по умолчанию, показывается состояние PENDING, даже если задача запущена. Если вы установите task_track_started на True, то состояние будет STARTED.

Состояние PENDING означает «я не знаю».

AsyncResult с состоянием PENDING не означает ничего больше, чем то, что Celery не знает статус задания. Это может быть по ряду причин.

С одной стороны, AsyncResult может быть создан с неверными идентификаторами задач. Такие "задачи" будут считаться ожидающими выполнения сельдереем:

>>> task.AsyncResult("invalid").status
'PENDING'

Хорошо, так что никто не собирается вводить очевидно недействительных идентификаторов на AsyncResult. Справедливо, но это также имеет эффект, что AsyncResult также будет считать задачу, которая была успешно запущена, но что Celery забыл быть PENDING. Опять, в некоторых сценариях использования это может быть проблемой. Часть проблемы зависит от того, как настроен Celery для сохранения результатов задач, потому что это зависит от наличия «надгробий» в бэкенде результатов. («Надгробия» - это термин, используемый в документации Celery для блоков данных, в которых записывается, как завершилось задание.) Использование AsyncResult не будет работать вообще, если task_ignore_result равно True. Еще более неприятная проблема заключается в том, что Celery по умолчанию выбрасывает надгробия. Настройка result_expires по умолчанию установлена ​​на 24 часа. Поэтому, если вы запускаете задачу и записываете идентификатор в долгосрочное хранилище, и еще через 24 часа, вы создаете с ним AsyncResult, статус будет PENDING.

Все "реальные задачи" запускаются в состоянии PENDING. Таким образом, получение PENDING для задания может означать, что задание было запрошено, но никогда не продвигалось дальше, чем это (по любой причине). Или это может означать, что задание выполнено, но Сельдери забыл свое состояние.

Ой! AsyncResult не будет работать для меня. Что еще я могу сделать?

Я предпочитаю отслеживать голов , чем отслеживать самих задач . Я храню некоторую информацию о задачах, но она действительно вторична для отслеживания целей. Цели хранятся в хранилище независимо от сельдерея. Когда запрос должен выполнить вычисление, зависит от того, какая цель была достигнута, он проверяет, была ли цель уже достигнута, если да, то он использует эту кэшированную цель, в противном случае он запускает задачу, которая повлияет на цель, и отправляет клиент, который сделал HTTP-запрос ответом, который указывает, что он должен ждать результата.


Приведенные выше имена переменных и гиперссылки относятся к Celery 4.x. В 3.x соответствующие переменные и гиперссылки: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

15 голосов
/ 03 января 2015

Вы также можете создавать собственные состояния и обновлять их значение при выполнении задачи. Этот пример из документов:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

9 голосов
/ 10 июля 2016

Старый вопрос, но я недавно столкнулся с этой проблемой.

Если вы пытаетесь получить task_id, вы можете сделать это следующим образом:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Теперь вы точно знаете, что такое task_id и теперь можете использовать его для получения AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
2 голосов
/ 29 ноября 2018

Просто используйте этот API из сельдерея FAQ

result = app.AsyncResult(task_id)

Это отлично работает.

0 голосов
/ 05 июня 2018

Помимо вышесказанного Программный подход Использование статуса «Цветочная задача» можно легко увидеть.

Мониторинг в режиме реального времени с использованием событий Celery.Flower - это веб-инструмент для мониторинга и администрирования кластеров Celery.

  1. Ход выполнения задачи и история
  2. Возможность отображения сведений о задаче (аргументы, время запуска, время выполнения и т. Д.)
  3. Графики и статистика

Официальный документ: Инструмент мониторинга цветов и сельдерея

Установка:

$ pip install flower

Использование:

http://localhost:5555
0 голосов
/ 13 июля 2017

Я нашел полезную информацию в

Руководстве для рабочих-проектировщиков сельдерея

В моем случае я проверяю, работает ли Celery.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Вы можете играть с Inspect, чтобы получить ваши потребности.

0 голосов
/ 31 марта 2017

для простых задач, мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/ для мониторинга.

и для сложных задач, скажем, задача, которая имеет дело с множеством других модулей.Мы рекомендуем вручную записывать ход выполнения и сообщения в конкретном блоке задач.

0 голосов
/ 07 мая 2016

Попробуйте:

task.AsyncResult(task.request.id).state

Это обеспечит статус задачи сельдерея.Если задача Celery уже находится в состоянии FAILURE , она выдаст исключение:

raised unexpected: KeyError('exc_type',)

...