Мне нужно создать такой рабочий процесс, в котором задачи определены в файле json с их зависимой задачей и на основе зависимого условия в параметре задачи, эта задача должна создаваться динамически. например,
Мой Json файл
{
"workflow_name": "workflow_dag",
"schedule": "@once",
"tasks": [
{
"id": "task1",
"name": "vipul",
"dependson": ""
},
{
"id": "task2",
"name": "ajay",
"dependson": "task1"
},
{
"id": "task3",
"name": "ajay",
"dependson": "task1"
},
{
"id": "task4",
"name": "prakash",
"dependson": "task2"
},
{
"id": "task5",
"name": "shivendra",
"dependson": "task3"
}
]
}
что-то, что у меня есть, но он генерирует задачи последовательно из файла json.
def create_dag(dag_id,
schedule,
default_args,
conf,
):
dag = DAG(dag_id,default_args=default_args,
schedule_interval=schedule)
list = []
with dag:
for flow in conf['flows']:
for (key, value) in flow.items():
if key=='id':
tab = DummyOperator(
task_id=value,
dag=dag,
)
list.append(tab)
break
if len(list) > 1:
for i in range(0, len(list)):
if i + 1 <= len(list)-1:
list[i] >> list[i + 1]
return dag
with open('/home/vipul/airflow-workspace/airflow_home/dags/wk_config.json'
) as json_data:
conf = json.load(json_data)
schedule = conf['schedule']
dag_id = conf['workflow_name']
args = {
'owner': 'vipul',
'depends_on_past': False,
'start_date': datetime(2020, 6, 19),
'email': ['vipul@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'concurrency': 1,
'max_active_runs': 1,
}
globals()[dag_id] = create_dag(dag_id, schedule, args, conf)
мой вывод должно быть как
task1 ->task2 -> task4
->task3 -> task5
Может кто-нибудь помочь с этим.