Имея дело с большим количеством задач, связанных со сложными зависимостями, я обнаружил, что обычно в конечном итоге повторяю довольно много «шаблона задачи», как вы показали в своем примере.
В этих ситуациях мне нравится чтобы позволить Python сделать «тяжелую работу» по созданию задач и их подключению:
default_args = {
"oracle_conn_id": "oracle_con"
}
task_dict = {
"ihn_reference_raw": {"proc": "task1"},
"aim_codelist_raw": {"proc": "task2"},
"decline_reason_dim_build": {"proc": "task3",
"upstream": ["ihn_reference_raw",
"aim_codelist_raw"]},
"decline_reason_dim_load": {"proc": "task4",
"upstream": ["decline_reason_dim_build"]}
}
...
with DAG(
...,
default_args=default_args
) as dag:
# Iterate the details to create the tasks
for task_id, details in task_dict.items():
OracleOperator(task_id=f"run_proc_{task_id}",
sql=f"BEGIN {details['proc']}; END;")
# Iterate a second time to "wire up" the upstream tasks.
for task_id, details in task_dict.items():
if task_up := details.get("upstream"):
dag.get_task(f"run_proc_{task_id}").set_upstream(task_up)
(я пропустил совсем немного для краткости, но идея есть)
Ключ в том, чтобы найти повторяющиеся части вашего процесса, сохранить элементы, уникальные для каждой задачи ( в нашем task_dict
в этом примере ), а затем l oop для построения .