Я изучаю Airflow и посмотрел один из примеров DAG, которые поставляются с Airflow (example_branch_python_dop_operator_3.py)
В этом примере DAG ветвится на одну ветвь за минуту (даты / времени выполнения) ) - четное число, и другая ветвь, если минута - нечетное число. Кроме того, в группе обеспечения доступности баз данных для depends_on_past
установлено значение True
в качестве значения по умолчанию для всех задач. Полный код:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': True,
}
# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
default_args=args,
)
def should_run(**kwargs):
print('------------- exec dttm = {} and minute = {}'.
format(kwargs['execution_date'], kwargs['execution_date'].minute))
if kwargs['execution_date'].minute % 2 == 0:
return "dummy_task_1"
else:
return "dummy_task_2"
cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag,
)
dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
cond >> [dummy_task_1, dummy_task_2]
Я ожидал, так как depends_on_past
Истина, что после первого запуска DAG задачи больше не смогут запускаться. Каждое задание будет смотреть на состояние предыдущего задания и видеть, что оно было skipped
, что не является успешным, и по существу зависать без состояния.
Однако это не то, что произошло. Вот результаты в древовидном представлении:
![enter image description here](https://i.stack.imgur.com/56xvI.png)
Как видите, все выбранные задачи выполняются при каждом запуске DAG. Почему это происходит? Я неправильно понимаю, что означает depends_on_past
? Я думал, что каждая задача смотрела на состояние задачи с тем же значением task_id в предыдущем прогоне DAG.
Чтобы запустить его, я просто включил DAG в главном интерфейсе, так что я считаю, что они запланированы пробеги.