Задание воздушного потока не запускается при выполнении правила запуска ONE_FAILED - PullRequest
0 голосов
/ 31 октября 2019

У меня есть группа обеспечения доступности баз данных на GCP Airflow с задачами, подобными приведенным ниже:

with DAG(dag_name, schedule_interval='0 6 * * *', default_args=default_dag_args) as dag:

    notify_start = po.PythonOperator(
        task_id = 'notify-on-start',
        python_callable = slack,
        op_kwargs={'msg': slack_start}
    )

    create_dataproc_cluster = d.create_cluster(default_dag_args['cluster_name'], service_account, num_workers)

    [assorted dataproc tasks]

    notify_on_fail = po.PythonOperator(
        task_id = 'notify-on-task-failure',
        python_callable = slack,
        op_kwargs={'msg': slack_error, 'err': True},
        trigger_rule = trigger_rule.TriggerRule.ONE_FAILED
    )

    delete_cluster = d.delete_cluster(default_dag_args['cluster_name'])

    notify_finish = po.PythonOperator(
        task_id = 'notify-on-completion',
        python_callable = slack,
        op_kwargs={'msg': slack_finish},
        trigger_rule = trigger_rule.TriggerRule.ALL_DONE
    )

    notify_start >> create_dataproc_cluster >> [assorted dataproc tasks >> delete_cluster >> notify_on_fail >> notify_finish

Проблема, с которой я сталкиваюсь, заключается в том, что в случае сбоя одной из задач dataproc задача notify_on_fail не запускается, несмотря на наличие* триггерное правило ONE_FAILED. Скорее, он раскручивает кластер и отправляет сообщение «Очистить все» (notify_finish). Мои задачи в неправильном порядке или что-то еще не так?

1 Ответ

0 голосов
/ 04 ноября 2019

Исходя из ваших ожиданий, я думаю, что DAG должен быть таким

notify_start >> create_dataproc_cluster >> [assorted dataproc tasks]
[assorted dataproc tasks] >> notify_on_fail >> delete_cluster
[assorted dataproc tasks] >> notify_finish >> delete_cluster
...