Я хочу создать DAG Airflow с несколькими задачами, и я хочу, чтобы они выполнялись как этот образ Пример и диаграмма Ганта, например this и список некоторых описаний ниже:
- A-1, B-1, C-1 должны выполняться последовательно
- A-2 зависит от A-1, B-2 зависит от B-1, а C-2 зависит от C-1
- A-2, B-2, C-2 могут выполняться параллельно
Я создал желаемый DAG с помощью кода ниже
main_task_list = ["T1", "T2", "T3"]
def decide_what_to_do(table_name, **context):
if random.randint(0, 100) > 80:
return tid_prefix_zip_file + table_name
else:
return tid_prefix_do_nothing + table_name
def create_tasks_list(table_name):
tid_call_api = tid_prefix_call_api + table_name
py_op_call_api = DummyOperator(
task_id= tid_call_api
)
tid_branch_operator = tid_prefix_branch + table_name
py_op_new_data_come_in = BranchPythonOperator(
task_id=tid_branch_operator,
python_callable=decide_what_to_do,
op_args=[table_name]
)
tid_zip_file = tid_prefix_zip_file + table_name
ssh_op_zip_file = DummyOperator(
task_id=tid_zip_file
)
tid_upload_blob = tid_prefix_upload + table_name
ssh_op_upload_file = DummyOperator(
task_id=tid_upload_blob
)
tid_update_table_setting = tid_prefix_update_table + table_name
py_update_tables_setting = DummyOperator(
task_id=tid_update_table_setting
)
tid_execute_databricks = tid_prefix_call_databricks + table_name
db_op_execute_notebook = DummyOperator(
task_id=tid_execute_databricks
)
dummy_op_do_nothing = DummyOperator(
task_id= tid_prefix_do_nothing + table_name
)
# branch 1
first_pipeline = [py_op_call_api, py_op_new_data_come_in, ssh_op_zip_file, ssh_op_upload_file, py_update_tables_setting, db_op_execute_notebook]
airflow.utils.helpers.chain(*first_pipeline)
# branch 1
second_pipeline = [py_op_new_data_come_in, dummy_op_do_nothing]
airflow.utils.helpers.chain(*second_pipeline)
tasks_list = [first_pipeline, second_pipeline]
return tasks_list
with DAG(dag_id, default_args = default_args) as dag:
tasks_chain_list = [create_tasks_list(each) for each in main_task_list]
start = DummyOperator(
task_id="start"
)
start >> tasks_chain_list[0][0][0]
for n in range(0, len(tasks_chain_list)-1):
tasks_chain_list[n][0][0] >> tasks_chain_list[n+1][0][0]
Но этот код не является гибким, если я хочу добавить больше веток в каждую цепочку задач.
Кто-нибудь может помочь мне улучшить код?
Благодаря.