У меня есть рабочий процесс ниже. Что я хочу сделать, так это запустить задачу join
, если все родители в успешном или пропущенном состоянии. Для этого я использую none_failed
триггерное правило, но возникает проблема, когда все исходящие задачи в состоянии skipped
, поэтому последующая задача join
автоматически пропускается. Есть ли какое-либо решение, чтобы я мог запустить задачу join
, если все родители skipped
? Правило триггера all_done
мне не подходит. Рабочий поток воздуха
dag = DAG(
dag_id="branch_without_trigger",
schedule_interval="@once",
start_date=dt.datetime(2019, 2, 28),
)
def fun():
raise AirflowSkipException("exception")
run_this_first = DummyOperator(task_id="run_this_first", dag=dag)
branching = DummyOperator(task_id="branching", dag=dag, python_callable=fun)
branch_a = DummyOperator(task_id="branch_a", dag=dag)
follow_branch_a = PythonOperator(
task_id="follow_branch_a", dag=dag, python_callable=fun
)
branch_b = PythonOperator(task_id="branch_b", dag=dag, python_callable=fun)
join = DummyOperator(task_id="join", dag=dag, trigger_rule="none_failed")
run_this_first.set_downstream(branching)
branching >> branch_a >> follow_branch_a >> join
branching >> branch_b >> join