Получение заданий из сельдерея из задачи не выполнено / uuid - PullRequest
1 голос
/ 29 июня 2019

Основная проблема

Я тестирую, как обрабатывать определенные сбои задач, например, обрабатывать исключение «TimeLimitExceeded», которое мгновенно убивает задачу и не является «перехватываемым» (Да ... Я в курсео существовании «SoftTimeLimit», но оно не соответствует моим потребностям).

Первый подход

Это мой tasks.py (рабочий работает с флагом --time-limit):

import logging
from celery import Celery
import time


app = Celery('tasks', broker='pyamqp://guest@localhost//')

def my_fail(task, exc, req_id, req_args, req_kwargs, einfo, *ext_args, **kwargs):
    logger.info("args: %r", req_args)
    logger.info("kw: %r", req_kwargs)

@app.task(on_failure=my_fail)
def sum(x, y, delay=0, **kw):
    result = x+y
    if result == 4:
        raise Exception("Some Error")
    time.sleep(delay)                                                               
    return x+y

Основная идея, когда задача не выполняется, чтобы иметь возможность выполнить некоторую обработку на основе аргументов / kwargs задачи

Например, если я запускаю sum.delay(3, 1, foo="bar") the Exception("Some Error")поднимается и регистрируется следующее:

[2019-06-30 17:21:45,120: INFO/Worker-1] args: (3, 1)
[2019-06-30 17:21:45,121: INFO/Worker-1] kw: {'foo': 'bar'}
[2019-06-30 17:21:45,122: ERROR/MainProcess] Task tasks.sum[9e9de032-1469-44e7-8932-4c490fcee2e3] raised unexpected: Exception('Some Error',)
Traceback (most recent call last):
  File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/apernin/test/tasks.py", line 89, in sum
    raise Exception("Some Error")
Exception: Some Error

Обратите внимание, что args / kwargs напечатаны моим on-failure обработчиком.

Теперь, если я запускаю sum.delay(3, 2, delay=7) TimeLimit срабатывает

[2019-06-30 17:23:15,244: INFO/MainProcess] Received task: tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde]
[2019-06-30 17:23:21,070: ERROR/MainProcess] Task tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde] raised unexpected: TimeLimitExceeded(5.0,)
Traceback (most recent call last):
  File "/home/apernin/.virtualenvs/dr/local/lib/python2.7/site-packages/billiard/pool.py", line 645, in on_hard_timeout
    raise TimeLimitExceeded(job._timeout)
TimeLimitExceeded: TimeLimitExceeded(5.0,)
[2019-06-30 17:23:21,071: ERROR/MainProcess] Hard time limit (5.0s) exceeded for tasks.sum[8c81398b-4378-401d-a674-a3bd3418ccde]
[2019-06-30 17:23:21,629: ERROR/MainProcess] Process 'Worker-1' pid:15472 exited with 'signal 15 (SIGTERM)'

Обратите внимание, что args / kwargs напечатаны примечанием, потому что обработчик on-failure не оправдан.Этого несколько ожидать из-за природы жесткого предела времени Celery.

Второй подход

Мой второй подход заключается в использовании прослушивателя событий.

from celery import Celery


def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Единственной информацией, которую мне удалось получить, была задача uuid , я не смог получить имя, аргументы или kwargs задачи (объект задачи содержит атрибуты, но все они отсутствуют).

Вопрос

Есть ли способ:

  • Сделать обработчик on_failure в случае жесткого ограничения времени?
  • Получить задачиargs / kwargs задачи с task-failed прослушивателем событий?

Заранее спасибо

1 Ответ

0 голосов
/ 03 июля 2019

Во-первых, тайм-аут обрабатывается рабочим (MainProcess), и он не обрабатывается так же, как сбои, произошедшие внутри задачи, такие как генерируемые исключения и т. Д. Вот почему вы видите его как TimeLimitExceeded, вызванный MainProcessв журнале.Так что, к сожалению, вы не можете полагаться на ту же логику ...

Однако ваш второй подход окажется полезным для отслеживания происходящего.

Я разработал (собственными силами) инструмент мониторинга Celery, который захватывает все события и заполняет ими базу данных, чтобы впоследствии мы могли выполнять все виды аналитики (см., например, среднее и наихудшее время работы, частоту сбоев и т. д.).

Чтобы получить необходимые данные из данных, предоставленных событием task-failed, вам также необходимо записать (например, сохранить их в каком-то словаре) данные события task-received.Эта информация содержит аргументы, имена задач и всю полезную информацию, которая может вам понадобиться.Вы связываете их обоих с помощью задачи UUID.

...