динамическое c создание дага на основе зависимостей из таблицы - PullRequest
0 голосов
/ 05 августа 2020

Я читаю из таблицы, которая содержит задачи, которые должны быть выполнены, и я также сохраняю зависимости в той же таблице. Я читаю таблицу во фрейм данных pandas.

image

for index, row in odf.iterrows():

 dag_id = row["DAG_ID"]
 task_id = row["TASK_ID"]
 task_name = row["TASK_NAME"]
 script_name = row["SCRIPT_NAME"]
 if row["DEPENDENT_ID"] is not None:
   dependents = row["DEPENDENT_ID"].split('|')
   print(dependents)

   t1 = OracleOperator(task_id=task_name,
                   oracle_conn_id='oracle_con',
                   sql='Begin %s; end;' % script_name, dag=dag)

   for d in dependents:

     for index, row in odf[odf["TASK_ID"] == int(d)].iterrows():
          t2 =  OracleOperator(task_id=row["TASK_NAME"],
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin %s; end;' %script_name,dag = dag)
          t1.set_upstream(t2)

, но мой результат не соответствует ожиданиям, и я вижу ниже. image

t1 = OracleOperator(task_id='run_proc_ihn_reference_raw',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task1; end;',dag = dag)

t2 = OracleOperator(task_id='run_proc_aim_codelist_raw',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task2; end;',dag = dag)

t3 = OracleOperator(task_id='run_proc_decline_reason_dim_build',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task3; end;',dag = dag)

t4 = OracleOperator(task_id='run_proc_decline_reason_dim_load',
                                       oracle_conn_id='oracle_con',
                                       sql= 'Begin proc.task4; end;',dag = dag)

(t1,t2) >> t3 >> t4

, но у меня может быть более 100 процедур, поэтому я ищу dag, который будет создан с зависимостями с использованием вышеуказанного метода.

нужна помощь для того же. Спасибо

1 Ответ

1 голос
/ 06 августа 2020

Имея дело с большим количеством задач, связанных со сложными зависимостями, я обнаружил, что обычно в конечном итоге повторяю довольно много «шаблона задачи», как вы показали в своем примере.

В этих ситуациях мне нравится чтобы позволить Python сделать «тяжелую работу» по созданию задач и их подключению:

default_args = {
    "oracle_conn_id": "oracle_con"
}

task_dict = {
    "ihn_reference_raw":        {"proc": "task1"},
    "aim_codelist_raw":         {"proc": "task2"},
    "decline_reason_dim_build": {"proc": "task3", 
                                 "upstream": ["ihn_reference_raw", 
                                              "aim_codelist_raw"]},
    "decline_reason_dim_load":  {"proc": "task4", 
                                 "upstream": ["decline_reason_dim_build"]}
}

...

with DAG(
    ..., 
    default_args=default_args
) as dag:

    # Iterate the details to create the tasks
    for task_id, details in task_dict.items():
        OracleOperator(task_id=f"run_proc_{task_id}",
                       sql=f"BEGIN {details['proc']}; END;")

    # Iterate a second time to "wire up" the upstream tasks.
    for task_id, details in task_dict.items():
        if task_up := details.get("upstream"):
            dag.get_task(f"run_proc_{task_id}").set_upstream(task_up)

(я пропустил совсем немного для краткости, но идея есть)

Ключ в том, чтобы найти повторяющиеся части вашего процесса, сохранить элементы, уникальные для каждой задачи ( в нашем task_dict в этом примере ), а затем l oop для построения .

...