Ниже приведена простая копия созданной мной группы DAG. Группа обеспечения доступности баз данных имеет оператор ветвления для выбора потока выполнения, который объединяется в общую задачу. Задача должна сгенерировать список файлов, который будет использоваться для создания задачи для каждой записи в файле списка.
Проблема в том, что я не могу выполнить динамические задачи.
"""
Required packages to execute DAG
"""
from __future__ import print_function
from builtins import range
import airflow
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
import os
import sys
# DAG parameters
args = {
'owner': 'AD',
'depends_on_past': False,
'start_date': datetime(2018, 5, 30),
'end_date': datetime(9999, 12, 31),
'dagrun_timeout': None,
'timeout': None,
'execution_timeout': None,
'provide_context': True,
}
# create DAG object with Name and default_args (args can set in DAG definition or while execution/runtime)
dag = DAG('sodag', schedule_interval=None, default_args=args)
# Define task - below are examples of tasks created by instantiated by PythonOperator- calling methods written in other py clas
start = DummyOperator(task_id='start', dag=dag)
dummyjoin = DummyOperator(task_id='dummyjoin', dag=dag, trigger_rule=TriggerRule.ONE_SUCCESS)
multidummy = DummyOperator(task_id='multidummy', dag=dag)
def identify_pre_process(**context):
return 'task1'
def xcl_preq(filename, **kwargs):
return BashOperator(
task_id="so_dag{}".format(filename),
trigger_rule=TriggerRule.ONE_SUCCESS,
provide_context=True,
bash_command='echo "executing branch tasks"',
dag=dag)
with dag:
router = BranchPythonOperator(task_id='trigger_pre_process',
python_callable=identify_pre_process,
dag=dag)
task1 = BashOperator(
task_id="task1",
bash_command='echo "executing task1"',
execution_timeout=None,
dag=dag)
task2 = BashOperator(
task_id="task2",
bash_command='echo "executing task2"',
execution_timeout=None,
dag=dag)
with open('/root/filelist.txt', 'r') as fp:
for file in fp:
filename = os.path.basename(file)
dummyjoin >> xcl_preq(filename) >> multidummy
start >> router
router >> task1 >> dummyjoin
router >> task2 >> dummyjoin