У меня есть AIRFLOW DAG со следующей структурой.

Все функции, которые начинаются с «check *» - это BranchPythonOperator и функцияexceptionControl - это ExecuteDagRunOperator, который получает каждую ошибку, чтобы обработать их.
Это конфигурация DAG:
checkCloudFunctions = BranchPythonOperator(
task_id='checkCloudFunctions',
python_callable=check_cloud_functions,
provide_context=True,
dag=dag)
checkSqlTables = BranchPythonOperator(
task_id='checkSqlTables',
python_callable=check_sql_tables,
provide_context=True,
dag=dag)
checkBigQueryTable = BranchPythonOperator(
task_id='checkBigQueryTable',
python_callable=check_big_query_table,
provide_context=True,
dag=dag)
labBuilt = DummyOperator(
task_id='labBuilt',
dag=dag)
exceptionControl = ExecuteDagRunOperator(
task_id='exceptionControl',
execute_dag_id="SYS_exception_control",
python_callable=mediation.dag_trigger_exception,
trigger_rule='one_success',
dag=dag)
# graphs
checkCloudFunctions >> checkSqlTables
checkCloudFunctions >> exceptionControl
checkSqlTables >> checkBigQueryTable
checkSqlTables >> exceptionControl
checkBigQueryTable >> labBuilt
checkBigQueryTable >> exceptionControl
Проблема заключается в , что checkSqlTables должен следовать за исключениемконтроль, но он пропускает и DAG заканчивается.Функция возвращает «exceptionControl», как мы видим в журнале checkSqlTables:
{base_task_runner.py:98} INFO - {python_operator.py:90} INFO - Done. Returned value was: exceptionControl
{base_task_runner.py:98} INFO - {python_operator.py:118} INFO - Following branch exceptionControl
{base_task_runner.py:98} INFO - {python_operator.py:119} INFO - Marking other directly downstream tasks as skipped
{base_task_runner.py:98} INFO - {python_operator.py:128} INFO - Done.
Я также играл с атрибутом trigger_rule (one_success, dummy ...), но это не такКажется, работает.
Если я удаляю первый шаг, он, кажется, работает, так что, похоже, это должно быть какая-то проблема конфигурации с моим dag.

Любые идеи, почему функция checkSqlTables не переходит в исключение Control?
РЕДАКТИРОВАТЬ: В новое глубокое чтение документации Airflow я заметил, что если шагпометьте задачу как пропущенную, она будет пропущена навсегда, поэтому мой код никогда не будет работать с операторами ветвления.
Решения, использующие ветвление, состоят в фиктивном шаге перед каждым шагом.Но у меня есть группа DAG, которая имеет более 10 шагов, и схема будет полностью хаосом.