Объясните, зависит от функциональности - PullRequest
5 голосов
/ 06 января 2020

Я изучаю Airflow и посмотрел один из примеров DAG, которые поставляются с Airflow (example_branch_python_dop_operator_3.py)

В этом примере DAG ветвится на одну ветвь за минуту (даты / времени выполнения) ) - четное число, и другая ветвь, если минута - нечетное число. Кроме того, в группе обеспечения доступности баз данных для depends_on_past установлено значение True в качестве значения по умолчанию для всех задач. Полный код:

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'depends_on_past': True,
}

# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
    dag_id='example_branch_dop_operator_v3',
    schedule_interval='*/1 * * * *',
    default_args=args,
)


def should_run(**kwargs):
    print('------------- exec dttm = {} and minute = {}'.
          format(kwargs['execution_date'], kwargs['execution_date'].minute))
    if kwargs['execution_date'].minute % 2 == 0:
        return "dummy_task_1"
    else:
        return "dummy_task_2"


cond = BranchPythonOperator(
    task_id='condition',
    provide_context=True,
    python_callable=should_run,
    dag=dag,
)

dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
cond >> [dummy_task_1, dummy_task_2]

Я ожидал, так как depends_on_past Истина, что после первого запуска DAG задачи больше не смогут запускаться. Каждое задание будет смотреть на состояние предыдущего задания и видеть, что оно было skipped, что не является успешным, и по существу зависать без состояния.

Однако это не то, что произошло. Вот результаты в древовидном представлении:

enter image description here

Как видите, все выбранные задачи выполняются при каждом запуске DAG. Почему это происходит? Я неправильно понимаю, что означает depends_on_past? Я думал, что каждая задача смотрела на состояние задачи с тем же значением task_id в предыдущем прогоне DAG.

Чтобы запустить его, я просто включил DAG в главном интерфейсе, так что я считаю, что они запланированы пробеги.

1 Ответ

3 голосов
/ 12 января 2020

Из журнала изменений для версии Airflow 1.7.1, 2016-05-19

- Treat SKIPPED and SUCCESS the same way when evaluating depends_on_past=True

Похоже, что условие проверяется здесь:

airflow/ti_deps/deps/prev_dagrun_dep.py (master brunch)

line 75: if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:
...