Воздушный поток: пропустить задачу с помощью ветвления - PullRequest
0 голосов
/ 25 октября 2019

В моей DAG хотел пропустить задачу (oracle_merge_hist_orig), зависящую от флага.

Моя логика:

когда oracle_branch = True выполнить [merge_op, update_table_op, table_count_op]

когда oracle_branch = False execute [update_table_op, table_count_op]

Я попытался использовать BranchPythonOperator следующим образом:

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}
oracle_branch = True
def branch_func():
    if oracle_branch:
        return "oracle_branch"
    else:
        return "normal_branch"

dag = DAG(
    dag_id='example_branch_operator',
    default_args=args,
    schedule_interval="@daily",
)

branching_op = BranchPythonOperator(
    task_id='branch_shall_run_oracle_merge_original_hist',
    python_callable=branch_func,
    dag= dag)

oracle_branch = DummyOperator(
    task_id='oracle_branch',
    dag=dag)

normal_branch = DummyOperator(
    task_id='normal_branch',
    dag=dag)

merge_op = DummyOperator(
    task_id='oracle_merge_hist_orig',
    dag=dag,
)

update_table_op = DummyOperator(
    task_id='update_table_job',
    dag=dag,
)

table_count_op = DummyOperator(
    task_id='table_count',
    dag=dag,
)

branching_op >> [oracle_branch,normal_branch] 
normal_branch >> update_table_op >> table_count_op
oracle_branch >> merge_op >> update_table_op >> table_count_op

Однако вместо пропуска Задачи пропускается весь путь.

Как это исправить, чтобы я пропустил только задачу "racle_merge_hist_orig"?

Когда oracle_branch = False enter image description here

когда oracle_branch = True enter image description here

1 Ответ

1 голос
/ 30 октября 2019

Каждое задание будет иметь trigger_rule, которое по умолчанию установлено на all_success. Мы можем переопределить его для различных значений, которые перечислены здесь .

В вашей группе обеспечения доступности баз данных задача update_table_job имеет две исходные задачи. Так как одна из задач в восходящем направлении находится в состоянии skipped, она также перешла в состояние skipped. Мы можем избежать этого, переопределив значение по умолчанию от trigger_rule до one_success, как показано ниже.

update_table_op = DummyOperator(
    task_id='update_table_job',
    trigger_rule='one_success',
    dag=dag
)

enter image description here Примечание: Я проверял это на версии Airflow 1.10.4.

...