Airflow BranchPythonOperator не следует за указанной веткой - PullRequest
0 голосов
/ 23 октября 2018

У меня есть AIRFLOW DAG со следующей структурой.

Airflow Graph View

Все функции, которые начинаются с «check *» - это BranchPythonOperator и функцияexceptionControl - это ExecuteDagRunOperator, который получает каждую ошибку, чтобы обработать их.

Это конфигурация DAG:

checkCloudFunctions = BranchPythonOperator(
    task_id='checkCloudFunctions',
    python_callable=check_cloud_functions,
    provide_context=True,
    dag=dag)

checkSqlTables = BranchPythonOperator(
    task_id='checkSqlTables',
    python_callable=check_sql_tables,
    provide_context=True,
    dag=dag)

checkBigQueryTable = BranchPythonOperator(
    task_id='checkBigQueryTable',
    python_callable=check_big_query_table,
    provide_context=True,
    dag=dag)

labBuilt = DummyOperator(
    task_id='labBuilt',
    dag=dag)

exceptionControl = ExecuteDagRunOperator(
    task_id='exceptionControl',
    execute_dag_id="SYS_exception_control",
    python_callable=mediation.dag_trigger_exception,
    trigger_rule='one_success',
    dag=dag)

# graphs
checkCloudFunctions >> checkSqlTables
checkCloudFunctions >> exceptionControl

checkSqlTables >> checkBigQueryTable
checkSqlTables >> exceptionControl

checkBigQueryTable >> labBuilt
checkBigQueryTable >> exceptionControl

Проблема заключается в , что checkSqlTables должен следовать за исключениемконтроль, но он пропускает и DAG заканчивается.Функция возвращает «exceptionControl», как мы видим в журнале checkSqlTables:

   {base_task_runner.py:98} INFO - {python_operator.py:90} INFO - Done. Returned value was: exceptionControl
   {base_task_runner.py:98} INFO - {python_operator.py:118} INFO - Following branch exceptionControl
   {base_task_runner.py:98} INFO - {python_operator.py:119} INFO - Marking other directly downstream tasks as skipped
   {base_task_runner.py:98} INFO - {python_operator.py:128} INFO - Done.

Я также играл с атрибутом trigger_rule (one_success, dummy ...), но это не такКажется, работает.

Если я удаляю первый шаг, он, кажется, работает, так что, похоже, это должно быть какая-то проблема конфигурации с моим dag.

enter image description here

Любые идеи, почему функция checkSqlTables не переходит в исключение Control?

РЕДАКТИРОВАТЬ: В новое глубокое чтение документации Airflow я заметил, что если шагпометьте задачу как пропущенную, она будет пропущена навсегда, поэтому мой код никогда не будет работать с операторами ветвления.

Решения, использующие ветвление, состоят в фиктивном шаге перед каждым шагом.Но у меня есть группа DAG, которая имеет более 10 шагов, и схема будет полностью хаосом.

...