Воздушный поток: как запустить операторов параллельно после завершения первого оператора - PullRequest
0 голосов
/ 19 октября 2018

В настоящее время у меня есть DAG, состоящая из 4 операторов, как показано ниже:

with DAG('dag', default_args=args, schedule_interval=schedule_interval, catchup=True) as dag:
main_dag = PythonOperator(
    task_id='1',
    python_callable=func,
    provide_context=True,
    dag=dag)

run_after_main_dag_1 = PythonOperator(
    task_id='1',
    python_callable=foo,
    provide_context=True,
    dag=dag)

run_after_main_dag_2 = BranchPythonOperator(
    task_id='2',
    python_callable=foo,
    provide_context=True)

run_after_main_dag_2_2 = PythonOperator(
    task_id='3',
    python_callable=foo,
    provide_context=False,
    dag=dag)

#this runs sequential, but shouldn't.
main_dag >> run_after_main_dag_1 >> run_after_main_dag_2 >> run_after_main_dag_2_2

Вот чего я хотел бы достичь:

  1. Выполнить main_dagоператор

  2. Как только main_dag закончится, запустите run_after_main_dag_1 и run_after_main_dag_2 параллельно, так как они не зависят друг от друга.

Я просто не могу найти, как добиться этого в документации нигде.Должен быть простой синтаксис, который я полностью игнорировал.

Кто-нибудь, кто знает, как это сделать?

Ответы [ 2 ]

0 голосов
/ 19 октября 2018

In Airflow >> и << используются для настройки зависимостей нисходящего и восходящего потоков.

Вы кодируете

main_dag >> run_after_main_dag_1 >> run_after_main_dag_2 >> run_after_main_dag_2_2 #sequentially

Это фактически определение отношения, которое выполняется последовательно, поскольку восходящий поток run_after_main_dag_1 установлен на main_dag и так далее.

Чтобы разделить run_after_main_dag_1 и run_after_main_dag_2, вы можете определить отношения таким образом, чтобы у обоих была восходящая задача, как main_dag

main_dag >> run_after_main_dag_1 # It is just dependent on main_dag
main_dag >> run_after_main_dag_2 # It is just dependent on main_dag

Затем они будут запускать две задачи параллельнокак только задача main_dag завершит свое выполнение

0 голосов
/ 19 октября 2018

Итак, был простой ответ:

main_dag >> run_after_main_dag_1
main_dag >> run_after_main_dag_2 >> run_after_main_dag_2_2
...