Воздушный поток не уважает оператора ветвления - PullRequest
0 голосов
/ 12 февраля 2020

Я пытаюсь добиться двойного ветвления в потоке воздуха. Вот образец DAG моей логики c:

import random as r

from airflow.models import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.trigger_rule import TriggerRule

dag = DAG(
    dag_id="branching_test_2",
    default_args={"start_date": "2020-01-01", "owner": "Airflow"},
    schedule_interval=None,
)

def spawn_branch(task_id):
    task = BranchPythonOperator(
        task_id='branching_' + task_id,
        python_callable=branch_func,
        provide_context=True,
        op_kwargs={'task_id': task_id},
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag)

    return task


def spawn_dummy(task_id):
    task = DummyOperator(
        task_id='dummy_' + task_id,
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag,
    )

    return task


def branch_func(task_id, **context):

    if r.randint(2, 100) % 2 == 0:
        return task_id
    else:
        return 'dummy_' + task_id


task_1 = BashOperator(
    task_id='task_1',
    bash_command='echo task_1 was executed',
    dag=dag,
)

task_2 = BashOperator(
    task_id='task_2',
    bash_command='echo task_2 was executed',
    trigger_rule=TriggerRule.NONE_FAILED,
    dag=dag,
)

task_3 = BashOperator(
    task_id='task_3',
    bash_command='echo task_3 was executed',
    trigger_rule=TriggerRule.NONE_FAILED,
    dag=dag,
)

task_4 = BashOperator(
    task_id='task_4',
    bash_command='echo task_4 was executed',
    trigger_rule=TriggerRule.NONE_FAILED,
    dag=dag,
)

join = DummyOperator(
        task_id='join',
        trigger_rule=TriggerRule.ONE_SUCCESS,
        dag=dag
)

spawn_dummy('start') >> spawn_branch('task_1') >> [task_1, spawn_dummy('task_1')] >> spawn_dummy('after_branch')

task_1 >> spawn_dummy('after_branch') >> [spawn_dummy('splitter_1'), spawn_dummy('splitter_2')]
spawn_dummy('task_1') >> spawn_dummy('after_branch') >> [spawn_dummy('splitter_1'), spawn_dummy('splitter_2')]

spawn_dummy('splitter_1') >> spawn_branch('task_2') >> [task_2, spawn_dummy('task_2')] >> join

spawn_dummy('splitter_2') >> spawn_branch('task_3') >> [task_3, spawn_dummy('task_3')] >> join

join >> spawn_branch('task_4') >> [task_4, spawn_dummy('task_4')]

Визуально DAG выглядит следующим образом (зеленый - выполненный, розовый - пропущенный):

working dag

Все работает, как и ожидалось, но проблема в том, что, как вы, наверное, заметили, мне нужна фиктивная задача перед ветвями для задачи 2 и задачи 3.

Если я удаляю их, Airflow не ' t уважать логику оператора Branch c и выполнять обе задачи. Поэтому, если мы изменим зависимость на:

spawn_dummy('start') >> spawn_branch('task_1') >> [task_1, spawn_dummy('task_1')] >> spawn_dummy('after_branch')

task_1 >> spawn_dummy('after_branch') >> [spawn_branch('task_2'), spawn_branch('task_3')]
spawn_dummy('task_1') >> spawn_dummy('after_branch') >> [spawn_branch('task_2'), spawn_branch('task_3')]

spawn_branch('task_2') >> [task_2, spawn_dummy('task_2')] >> join

spawn_branch('task_3') >> [task_3, spawn_dummy('task_3')] >> join

join >> spawn_branch('task_4') >> [task_4, spawn_dummy('task_4')]

График выглядит следующим образом, и, как вы можете видеть, обе задачи после ветвей 2 и 3 выполняются (зеленый - выполненный, розовый - пропущенный):

enter image description here

Странно, что я получаю это в журналах, например для branch2:

[2020-02-12 08: 16: 32,810] {python_operator.py:113} ИНФОРМАЦИЯ - Готово. Возвращаемое значение: dummy_task_2

[2020-02-12 08: 16: 32,811] {python_operator.py:143} INFO - следующая ветвь ['dummy_task_2']

[2020-02- 12 08: 16: 32,812] {python_operator.py:144} ИНФОРМАЦИЯ - Пометка других задач прямого потока как пропущенных

Однако выполняются и задачи_2, и dummy_task2, как вы можете видеть на скриншоте ветка / task3.

Это поведение задумано или это ошибка? Версия My Airflow - 1.10.3

...