У меня есть рабочий процесс, в котором у меня есть два параллельных процесса (sentinel_run
и sentinel_skip
), которые должны выполняться или пропускаться в зависимости от условия, а затем объединяться (resolve
). Мне нужно, чтобы задачи, расположенные непосредственно после любой задачи sentinel_
, каскадно пропускались, но когда она попадает к задаче resolve
, resolve
должна запускаться, если в каком-либо из процессов выше не происходит сбоев.
На основе документация , должно работать правило триггера none_failed:
none_failed: все родители не потерпели неудачу (отказ или upstream_failed), т.е. все родители преуспели или были пропущены
и это также ответ на связанный вопрос .
Однако, когда я реализовал тривиальный пример, это не то, что я вижу:
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.utils.dates import days_ago
dag = DAG(
"testing",
catchup=False,
schedule_interval="30 12 * * *",
default_args={
"owner": "test@gmail.com",
"start_date": days_ago(1),
"catchup": False,
"retries": 0
}
)
start = DummyOperator(task_id="start", dag=dag)
sentinel_run = ShortCircuitOperator(task_id="sentinel_run", dag=dag, python_callable=lambda: True)
sentinel_skip = ShortCircuitOperator(task_id="sentinel_skip", dag=dag, python_callable=lambda: False)
a = DummyOperator(task_id="a", dag=dag)
b = DummyOperator(task_id="b", dag=dag)
c = DummyOperator(task_id="c", dag=dag)
d = DummyOperator(task_id="d", dag=dag)
e = DummyOperator(task_id="e", dag=dag)
f = DummyOperator(task_id="f", dag=dag)
g = DummyOperator(task_id="g", dag=dag)
resolve = DummyOperator(task_id="resolve", dag=dag, trigger_rule="none_failed")
start >> sentinel_run >> a >> b >> c >> resolve
start >> sentinel_skip >> d >> e >> f >> resolve
resolve >> g
Этот код создает следующий знак:
Проблема заключается в том, что задача resolved
должна выполняться (потому что ничто в восходящем направлении не является либо upstream_failed
или failed
), но вместо этого он пропускает.
Я изучил базу данных, и нет скрытых сбойных или вышестоящих сбойных задач, и я не могу понять, почему это не будетсоблюдать логику «none_failed».
Я знаю о "некрасивом обходном пути" и реализовал его в других рабочих процессах, но он добавляет еще одну задачу для выполнения и увеличивает сложность, с которой приходится сталкиваться новым пользователям в группе обеспечения доступности баз данных (особенно при умноженииэто несколькими задачами ...). Это было моей основной причиной для обновления с Airflow 1.8 до Airflow 1.10, поэтому я надеюсь, что мне просто не хватает чего-то очевидного ...