правило триггера воздушного потока none_failed не работает - PullRequest
0 голосов
/ 15 октября 2019

У меня есть рабочий процесс ниже. Что я хочу сделать, так это запустить задачу join, если все родители в успешном или пропущенном состоянии. Для этого я использую none_failed триггерное правило, но возникает проблема, когда все исходящие задачи в состоянии skipped, поэтому последующая задача join автоматически пропускается. Есть ли какое-либо решение, чтобы я мог запустить задачу join, если все родители skipped? Правило триггера all_done мне не подходит. Рабочий поток воздуха

dag = DAG(
    dag_id="branch_without_trigger",
    schedule_interval="@once",
    start_date=dt.datetime(2019, 2, 28),
)


def fun():
    raise AirflowSkipException("exception")


run_this_first = DummyOperator(task_id="run_this_first", dag=dag)
branching = DummyOperator(task_id="branching", dag=dag, python_callable=fun)

branch_a = DummyOperator(task_id="branch_a", dag=dag)
follow_branch_a = PythonOperator(
    task_id="follow_branch_a", dag=dag, python_callable=fun
)

branch_b = PythonOperator(task_id="branch_b", dag=dag, python_callable=fun)

join = DummyOperator(task_id="join", dag=dag, trigger_rule="none_failed")

run_this_first.set_downstream(branching)
branching >> branch_a >> follow_branch_a >> join
branching >> branch_b >> join

...