Цикл DAG Airflow - Как сделать каждую итерацию последовательной, а не параллельной - PullRequest
0 голосов
/ 24 мая 2019

У меня есть Apache Airflow DAG примерно так:

DAG_NAME='my_dag'
sections = ["0", "1", "2", "3"]

with DAG(DAG_NAME, default_args=default_args, schedule_interval=None) as dag:

        for s in sections:
            a = DummyOperator(task_id=f"section_{s}_start")
            b = SubDagOperator(task_id=f"init_{s}_subdag",subdag=init_section(DAG_NAME,f"init_{s}_subdag", default_args))
            c = SubDagOperator(task_id=f"process_{s}_subdag", subdag=process_section(DAG_NAME,f"process_{s}_subdag", default_args))
            d = SubDagOperator(task_id=f"update_{s}_subdag", subdag=update_section(DAG_NAME,f"update_{s}_subdag", default_args))
            e = DummyOperator(task_id=f"section_{s}_end")
            a>>b>>c>>d>>e

Этот код отображает мои задачи примерно так: enter image description here

enter image description here

Как мне сделать последовательность задач такой:

section_0_start >> init_0_subdag >> process_0_subdag >> update_0_subdag >> section_0_end section_0_end >> section_1_start section_1_start >> init_1_subdag >> process_1_subdag >> update_1_subdag >> section_1_end

.....

и т. Д. В последовательности из раздела 0, заканчивающегося задачами раздела 3

Спасибо

1 Ответ

0 голосов
/ 30 мая 2019

Измените цикл for следующим образом:

    previous_e = None
    for s in sections:
        a = ...
        ...
        e = ...
        if previous_e:
            previous_e >> a
        a>>b>>c>>d>>e
        previous_e = e
...