У меня была та же проблема, я не мог на 100% решить проблему «воздушным потоком», так как я думаю, что число задач и подзадач воздушного потока определяется в момент проверки DAG.И при проверке никакая задача не запускается, поэтому воздушный поток не знает заранее, сколько запланированных задач subdag.task.
Способ, которым я обошел эту проблему, может быть не лучшим (я открыт дляпредложения), но это работает:
main_dag.py
# imports omitted for brevity
def get_info_from_db():
# get info from db or somewhere else, this info will define the number of subdag tasks to run
return urls, names
dag = DAG(...)
urls, names = get_info_from_db()
# You may ignore the dummy operators
start = DummyOperator(task_id='start', default_args=args, dag=dag)
sub_section = SubDagOperator(
task_id='import-file',
subdag=imported_subdag(DAG_NAME, 'subdag-name', args, urls=urls, file_names=names),
default_args=args,
dag=dag,
)
end = DummyOperator(task_id='end', default_args=args, dag=dag)
start.set_downstream(sub_section)
section_1.set_downstream(end)
Тогда, наконец, у меня есть мой subdag.py (убедитесь, что его можно обнаружить из потока воздуха) на случай, если он находится в отдельном файле
# imports omitted for brevity
def fetch_files(file_url, file_name):
# get file and save it to disk
return file_location
# this is how I get info returned from the previous task: fetch_files
def validate_file(task_id, **kwargs):
ti = kwargs['ti']
task = 'fetch_file-{}'.format(task_id)
file_location = ti.xcom_pull(task_ids=task)
def imported_subdag(parent_dag_name, child_dag_name, args, urls, file_names):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i in range(len(urls)):
# the task name should also be dynamic in order not to have duplicates
validate_file_operator = PythonOperator(task_id='validate_file-{}'.format(i+1),
python_callable=validate_file,
provide_context=True, dag=dag_subdag, op_kwargs={'task_id': i + 1})
fetch_operator = PythonOperator(task_id='fetch_file-{}'.format(i+1),
python_callable=fetch_zip, dag=dag_subdag,
op_kwargs={'file_url': urls[i], 'file_name': file_names[i]})
fetch_operator.set_downstream(validate_file_operator)
return dag_subdag
По сути, моя логика заключается в том, что в момент проверки Airflow get_info_from_db()
выполняется, и все dag и subdags правильно планируются динамически.Если я добавляю или удаляю содержимое из базы данных, число задач, которые нужно выполнить, будет обновлено при следующей проверке dag.
Этот подход подходит для моего варианта использования, но я надеюсь, что в будущем Airflow поддержит эту функцию (динамическое количество задач / подзадач.задач) изначально.