как реализовать поток воздуха DAG в петле - PullRequest
0 голосов
/ 03 мая 2019

Я только начал с Airflow. Я хочу настроить группу обеспечения доступности баз данных в цикле, где следующая группа обеспечения доступности баз данных начинается, когда завершается предыдущая группа обеспечения доступности баз данных. Вот рабочий процесс, которого я хочу достичь:

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   t1.set_downstream(t2)

Если я запускаю airflow backfill pipeline -s 2019-05-01, все группы доступности баз данных запускаются одновременно.

1 Ответ

1 голос
/ 07 мая 2019

DAG не могут зависеть друг от друга, это отдельные рабочие процессы. Вместо этого вы хотите настроить задачи так, чтобы они зависели друг от друга. У вас может быть одна группа доступности базы данных с несколькими ветвями выполнения, по одной для каждого файла, что-то вроде этого (не проверено):

dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
    for file in list_of_files:
       t1 = BashOperator('copy_this_file', ....)
       t2 = BashOperator('process_this_file', ...)
       t1.set_downstream(t2)
...