Восстановление после сбоя задачи за пределами max_retries - PullRequest
20 голосов
/ 28 июня 2011

Я пытаюсь асинхронно использовать веб-службу, поскольку для возврата требуется до 45 секунд.К сожалению, этот веб-сервис также ненадежен и может выдавать ошибки.Я настроил django-celery и выполняю свои задачи, которые работают нормально, пока задача не выходит за пределы max_retries.

Вот что у меня есть:

@task(default_retry_delay=5, max_retries=10)
def request(xml):
    try:
        server = Client('https://www.whatever.net/RealTimeService.asmx?wsdl')
        xml = server.service.RunRealTimeXML(
            username=settings.WS_USERNAME,
            password=settings.WS_PASSWORD,
            xml=xml
        )
    except Exception, e:
        result = Result(celery_id=request.request.id, details=e.reason, status="i")
        result.save()
        try:
            return request.retry(exc=e)
        except MaxRetriesExceededError, e:
            result = Result(celery_id=request.request.id, details="Max Retries Exceeded", status="f")
            result.save()
            raise
    result = Result(celery_id=request.request.id, details=xml, status="s")
    result.save()
    return result

К сожалению, MaxRetriesExceededError не выбрасывается retry(), поэтому я не уверен, как справиться с ошибкой этогозадача.Django уже вернул HTML клиенту, и я проверяю содержимое Result через AJAX, который никогда не достигает полного состояния отказа f.

Таким образом, вопрос: как я могу обновить свою базу данных, когда задача Celery превысила max_retries?

Ответы [ 4 ]

15 голосов
/ 27 февраля 2016

Проблема в том, что сельдерей пытается повторно поднять исключение, которое вы передали, когда оно достигает предела повторных попыток.Код для этого повторного поднятия приведен здесь: https://github.com/celery/celery/blob/v3.1.20/celery/app/task.py#L673-L681

Самый простой способ обойти это - просто не делать так, чтобы сельдерей вообще управлял вашими исключениями:

@task(max_retries=10)
def mytask():
    try:
        do_the_thing()
    except Exception as e:
        try:
            mytask.retry()
        except MaxRetriesExceededError:
            do_something_to_handle_the_error()
            logger.exception(e)
15 голосов
/ 20 февраля 2012

С версией Celery 2.3.2 этот подход хорошо сработал для меня:

class MyTask(celery.task.Task):
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self.max_retries == self.request.retries:
            #If max retries is equal to task retries do something

@task(base=MyTask, default_retry_delay=5, max_retries=10)
def request(xml):
    #Your stuff here
14 голосов
/ 28 июня 2011

Вы можете переопределить метод after_return класса задачи сельдерея, этот метод вызывается после выполнения задачи независимо от состояния ret (SUCCESS, FAILED, RETRY)

class MyTask(celery.task.Task)

    def run(self, xml, **kwargs)
        #Your stuffs here

    def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
        if self.max_retries == int(kwargs['task_retries']):
            #If max retries are equals to task retries do something
        if status == "FAILURE":
            #You can do also something if the tasks fail instead of check the retries

http://readthedocs.org/docs/celery/en/latest/reference/celery.task.base.html#celery.task.base.BaseTask.after_return

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return

6 голосов
/ 23 мая 2014

Сейчас я просто занимаюсь этим, избавляет меня от работы над подклассами Task и легко понят.

# auto-retry with delay as defined below. After that, hook is disabled.
@celery.shared_task(bind=True, max_retries=5, default_retry_delay=300)
def post_data(self, hook_object_id, url, event, payload):
    headers = {'Content-type': 'application/json'}
    try:
        r = requests.post(url, data=payload, headers=headers)
        r.raise_for_status()
    except requests.exceptions.RequestException as e:
        if self.request.retries >= self.max_retries:
            log.warning("Auto-deactivating webhook %s for event %s", hook_object_id, event)
            Webhook.objects.filter(object_id=hook_object_id).update(active=False)
            return False
        raise self.retry(exc=e)
    return True
...