Сначала работая с dag callback
(on_failure_callback
и on_success_callback
), я думал, что это вызовет статусы success
или fail
, когда закончится dag
(как это определено в dag).
Но затем он создается для каждого task instance
, а не dag run
, поэтому, если группа обеспечения доступности баз данных имеет N задач, она будет вызывать эти обратные вызовы N раз.
Я пытаюсь поймать идентификатор задачи и отправить его на провал. Читая еще один связанный вопрос Я придумал следующее:
def success_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
def failure_msg(context):
slack.slack_message(context['task_instance']); #send task-id to slack
default_args = {
[...]
'on_failure_callback': failure_msg,
'on_success_callback': success_msg,
[...]
}
Но это не помогает, как мне разобрать переменные контекста и чтобы мне было позволено получить идентификатор задачи?