У меня есть группа обеспечения доступности баз данных на GCP Airflow с задачами, подобными приведенным ниже:
with DAG(dag_name, schedule_interval='0 6 * * *', default_args=default_dag_args) as dag:
notify_start = po.PythonOperator(
task_id = 'notify-on-start',
python_callable = slack,
op_kwargs={'msg': slack_start}
)
create_dataproc_cluster = d.create_cluster(default_dag_args['cluster_name'], service_account, num_workers)
[assorted dataproc tasks]
notify_on_fail = po.PythonOperator(
task_id = 'notify-on-task-failure',
python_callable = slack,
op_kwargs={'msg': slack_error, 'err': True},
trigger_rule = trigger_rule.TriggerRule.ONE_FAILED
)
delete_cluster = d.delete_cluster(default_dag_args['cluster_name'])
notify_finish = po.PythonOperator(
task_id = 'notify-on-completion',
python_callable = slack,
op_kwargs={'msg': slack_finish},
trigger_rule = trigger_rule.TriggerRule.ALL_DONE
)
notify_start >> create_dataproc_cluster >> [assorted dataproc tasks >> delete_cluster >> notify_on_fail >> notify_finish
Проблема, с которой я сталкиваюсь, заключается в том, что в случае сбоя одной из задач dataproc задача notify_on_fail
не запускается, несмотря на наличие* триггерное правило ONE_FAILED
. Скорее, он раскручивает кластер и отправляет сообщение «Очистить все» (notify_finish
). Мои задачи в неправильном порядке или что-то еще не так?