Создавайте задачи в динамическом режиме в Airflow DAG - PullRequest
0 голосов
/ 01 июля 2019

Я хочу создать DAG Airflow с несколькими задачами, и я хочу, чтобы они выполнялись как этот образ Пример и диаграмма Ганта, например this и список некоторых описаний ниже:

  • A-1, B-1, C-1 должны выполняться последовательно
  • A-2 зависит от A-1, B-2 зависит от B-1, а C-2 зависит от C-1
  • A-2, B-2, C-2 могут выполняться параллельно

Я создал желаемый DAG с помощью кода ниже

main_task_list = ["T1", "T2", "T3"]

def decide_what_to_do(table_name, **context):
    if random.randint(0, 100) > 80:
        return tid_prefix_zip_file + table_name
    else:
        return tid_prefix_do_nothing + table_name

def create_tasks_list(table_name):

    tid_call_api = tid_prefix_call_api + table_name
    py_op_call_api = DummyOperator(
        task_id= tid_call_api
    )

    tid_branch_operator = tid_prefix_branch + table_name
    py_op_new_data_come_in = BranchPythonOperator(
        task_id=tid_branch_operator,
        python_callable=decide_what_to_do,
        op_args=[table_name]
    )

    tid_zip_file = tid_prefix_zip_file + table_name
    ssh_op_zip_file = DummyOperator(
        task_id=tid_zip_file
    )

    tid_upload_blob = tid_prefix_upload + table_name
    ssh_op_upload_file = DummyOperator(
        task_id=tid_upload_blob
    )

    tid_update_table_setting = tid_prefix_update_table + table_name
    py_update_tables_setting = DummyOperator(
        task_id=tid_update_table_setting
    )

    tid_execute_databricks = tid_prefix_call_databricks + table_name
    db_op_execute_notebook = DummyOperator(
        task_id=tid_execute_databricks
    )

    dummy_op_do_nothing = DummyOperator(
        task_id= tid_prefix_do_nothing + table_name
    )

    # branch 1
    first_pipeline = [py_op_call_api, py_op_new_data_come_in, ssh_op_zip_file, ssh_op_upload_file, py_update_tables_setting, db_op_execute_notebook]
    airflow.utils.helpers.chain(*first_pipeline)

    # branch 1
    second_pipeline = [py_op_new_data_come_in, dummy_op_do_nothing]
    airflow.utils.helpers.chain(*second_pipeline)

    tasks_list = [first_pipeline, second_pipeline]
    return tasks_list

with DAG(dag_id, default_args = default_args) as dag:
    tasks_chain_list = [create_tasks_list(each) for each in main_task_list]

    start = DummyOperator(
        task_id="start"
    )

start >> tasks_chain_list[0][0][0]
for n in range(0, len(tasks_chain_list)-1):
    tasks_chain_list[n][0][0] >> tasks_chain_list[n+1][0][0]

Но этот код не является гибким, если я хочу добавить больше веток в каждую цепочку задач. Кто-нибудь может помочь мне улучшить код? Благодаря.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...