Как заставить задачу сельдерея провалиться изнутри задачи? - PullRequest
30 голосов
/ 06 октября 2011

В некоторых условиях я хочу, чтобы задача сельдерея провалилась из-за этой задачи.Я попытался сделать следующее:

from celery.task import task
from celery import states

@task()
def run_simulation():
    if some_condition:
        run_simulation.update_state(state=states.FAILURE)
        return False

Однако задача все еще сообщает об успешном выполнении:

Задача sim.tasks.run_simulation [9235e3a7-c6d2-4219-bbc7-acf65c816e65]успешно в 1.17847704887s: False

Кажется, что состояние можно изменить только во время выполнения задачи и после ее завершения - сельдерей меняет состояние на то, что он считает результатом (см. этот вопрос ).Есть ли способ, не выполнив задачу, вызвав исключение, вернуть сельдерею, что задача не выполнена?

Ответы [ 3 ]

22 голосов
/ 15 октября 2015

Чтобы пометить задачу как невыполненную, не вызывая исключение, измените состояние задачи на FAILURE, а затем вызовите исключение Ignore, поскольку при возвращении любого значения задание будет считаться успешным, например:

from celery import Celery, states
from celery.exceptions import Ignore

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task(bind=True)
def run_simulation(self):
    if some_condition:
        # manually update the task state
        self.update_state(
            state = states.FAILURE,
            meta = 'REASON FOR FAILURE'
        )

        # ignore the task so no other state is recorded
        raise Ignore()

Но лучший способ - вызвать исключение из вашей задачи, вы можете создать собственное исключение для отслеживания этих сбоев:

class TaskFailure(Exception):
   pass

И поднимите это исключение из вашей задачи:

if some_condition:
    raise TaskFailure('Failure reason')
2 голосов
/ 11 октября 2011

Я получил интересный ответ на этот вопрос от Ask Solem, где он предлагает обработчик after_return для решения проблемы.Это может быть интересным вариантом в будущем.

Тем временем я решил проблему, просто вернув строку «Сбой» из задачи, когда я хочу, чтобы она не прошла, а затем проверив это следующим образом:

result = AsyncResult(task_id)
if result.state == 'FAILURE' or (result.state == 'SUCCESS' and result.get() == 'FAILURE'):
    # Failure processing task 
1 голос
/ 21 января 2019

Я хотел бы еще больше расширить ответ Пьера, так как я столкнулся с некоторыми проблемами при использовании предложенного решения.

Чтобы разрешить настраиваемые поля при обновлении состояния задачи до состояний.смоделируйте некоторые атрибуты, которые будет иметь состояние FAILURE (обратите внимание на exc_type и exc_message) Хотя решение завершит задачу, любая попытка запросить состояние (например, извлечь значение REASON FOR FAILURE) завершится неудачей.

Ниже приведен фрагмент для справки, который я взял из: https://www.distributedpython.com/2018/09/28/celery-task-states/

@app.task(bind=True)
def task(self):
    try:
        raise ValueError('Some error')
    except Exception as ex:
        self.update_state(
            state=states.FAILURE,
            meta={
                'exc_type': type(ex).__name__,
                'exc_message': traceback.format_exc().split('\n')
                'custom': '...'
            })
        raise Ignore()
...