Airflow - анализировать идентификатор задачи из обратного вызова контекста dag - PullRequest
0 голосов
/ 04 июля 2018

Сначала работая с dag callback (on_failure_callback и on_success_callback), я думал, что это вызовет статусы success или fail, когда закончится dag (как это определено в dag). Но затем он создается для каждого task instance, а не dag run, поэтому, если группа обеспечения доступности баз данных имеет N задач, она будет вызывать эти обратные вызовы N раз.

Я пытаюсь поймать идентификатор задачи и отправить его на провал. Читая еще один связанный вопрос Я придумал следующее:

def success_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

def failure_msg(context):
    slack.slack_message(context['task_instance']); #send task-id to slack

default_args = {
    [...]
    'on_failure_callback': failure_msg,
    'on_success_callback': success_msg,
    [...]
}

Но это не помогает, как мне разобрать переменные контекста и чтобы мне было позволено получить идентификатор задачи?

1 Ответ

0 голосов
/ 04 июля 2018

Вы можете получить доступ к задаче с помощью объекта задачи из контекста.

context['task'] должен быть подходящим способом сделать это. Чтобы получить имя задачи, используйте task_id:

context['task'].task_id

Чтобы найти больше объектов, доступных в контексте, вы можете просмотреть список здесь: https://airflow.apache.org/macros.html

...