Я не ожидаю, что Airflow изменит DAG, когда DagRun активен, поэтому я не буду ставить деньги на получение файлов и добавление задач в той же DAG. При этом Airflow регенерирует DAG каждые несколько секунд. У вас может быть одна группа DAG, которая получает файлы, и другая группа DAG, которая обрабатывает эти файлы. После получения файлов первый DAG должен будет подождать минуту, чтобы убедиться, что Airflow заметит, а затем запустить второй DAG с TriggerDagRunOperator
.
.
DAG1:
def wait_a_minute():
time.sleep(60)
get_files = DummyOperator(dag=dag, task_id='get_files')
give_airflow_time_to_rebuild_DAG2 = PythonOperator(dag=dag, task_id='give_airflow_time_to_rebuild_DAG2', python_callable=wait_a_minute)
trigger_DAG2 = TriggerDagRunOperator(dag=dag, task_id='trigger_DAG2', trigger_dag_id='DAG2', execution_date='{{ ds }}')
get_files >> give_airflow_time_to_rebuild_DAG2 >> trigger_DAG2
DAG2:
pre_process = DummyOperator(dag=dag, task_id='pre_process')
post_process = DummyOperator(dag=dag, task_id='post_process')
files = get_files_to_process()
for file in files:
process = DummyOperator(dag=dag, task_id=f'process_{file}')
pre_process >> process >> post_process
Больше взломать, чем решение, но что-то вроде этого должно работать. Есть проблемы с внешними триггерами и динамическими задачами. Обычно я сталкиваюсь с проблемами планировщика, когда приходится использовать depends_on_past=True
.