В моей DAG хотел пропустить задачу (oracle_merge_hist_orig), зависящую от флага.
Моя логика:
когда oracle_branch = True выполнить [merge_op, update_table_op, table_count_op]
когда oracle_branch = False execute [update_table_op, table_count_op]
Я попытался использовать BranchPythonOperator следующим образом:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
oracle_branch = True
def branch_func():
if oracle_branch:
return "oracle_branch"
else:
return "normal_branch"
dag = DAG(
dag_id='example_branch_operator',
default_args=args,
schedule_interval="@daily",
)
branching_op = BranchPythonOperator(
task_id='branch_shall_run_oracle_merge_original_hist',
python_callable=branch_func,
dag= dag)
oracle_branch = DummyOperator(
task_id='oracle_branch',
dag=dag)
normal_branch = DummyOperator(
task_id='normal_branch',
dag=dag)
merge_op = DummyOperator(
task_id='oracle_merge_hist_orig',
dag=dag,
)
update_table_op = DummyOperator(
task_id='update_table_job',
dag=dag,
)
table_count_op = DummyOperator(
task_id='table_count',
dag=dag,
)
branching_op >> [oracle_branch,normal_branch]
normal_branch >> update_table_op >> table_count_op
oracle_branch >> merge_op >> update_table_op >> table_count_op
Однако вместо пропуска Задачи пропускается весь путь.
Как это исправить, чтобы я пропустил только задачу "racle_merge_hist_orig"?
Когда oracle_branch = False ![enter image description here](https://i.stack.imgur.com/sBmPt.png)
когда oracle_branch = True ![enter image description here](https://i.stack.imgur.com/0PqaR.png)