Я пытаюсь добиться двойного ветвления в потоке воздуха. Вот образец DAG моей логики c:
import random as r
from airflow.models import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.trigger_rule import TriggerRule
dag = DAG(
dag_id="branching_test_2",
default_args={"start_date": "2020-01-01", "owner": "Airflow"},
schedule_interval=None,
)
def spawn_branch(task_id):
task = BranchPythonOperator(
task_id='branching_' + task_id,
python_callable=branch_func,
provide_context=True,
op_kwargs={'task_id': task_id},
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag)
return task
def spawn_dummy(task_id):
task = DummyOperator(
task_id='dummy_' + task_id,
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
return task
def branch_func(task_id, **context):
if r.randint(2, 100) % 2 == 0:
return task_id
else:
return 'dummy_' + task_id
task_1 = BashOperator(
task_id='task_1',
bash_command='echo task_1 was executed',
dag=dag,
)
task_2 = BashOperator(
task_id='task_2',
bash_command='echo task_2 was executed',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
task_3 = BashOperator(
task_id='task_3',
bash_command='echo task_3 was executed',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
task_4 = BashOperator(
task_id='task_4',
bash_command='echo task_4 was executed',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag,
)
join = DummyOperator(
task_id='join',
trigger_rule=TriggerRule.ONE_SUCCESS,
dag=dag
)
spawn_dummy('start') >> spawn_branch('task_1') >> [task_1, spawn_dummy('task_1')] >> spawn_dummy('after_branch')
task_1 >> spawn_dummy('after_branch') >> [spawn_dummy('splitter_1'), spawn_dummy('splitter_2')]
spawn_dummy('task_1') >> spawn_dummy('after_branch') >> [spawn_dummy('splitter_1'), spawn_dummy('splitter_2')]
spawn_dummy('splitter_1') >> spawn_branch('task_2') >> [task_2, spawn_dummy('task_2')] >> join
spawn_dummy('splitter_2') >> spawn_branch('task_3') >> [task_3, spawn_dummy('task_3')] >> join
join >> spawn_branch('task_4') >> [task_4, spawn_dummy('task_4')]
Визуально DAG выглядит следующим образом (зеленый - выполненный, розовый - пропущенный):
![working dag](https://i.stack.imgur.com/mxGfV.png)
Все работает, как и ожидалось, но проблема в том, что, как вы, наверное, заметили, мне нужна фиктивная задача перед ветвями для задачи 2 и задачи 3.
Если я удаляю их, Airflow не ' t уважать логику оператора Branch c и выполнять обе задачи. Поэтому, если мы изменим зависимость на:
spawn_dummy('start') >> spawn_branch('task_1') >> [task_1, spawn_dummy('task_1')] >> spawn_dummy('after_branch')
task_1 >> spawn_dummy('after_branch') >> [spawn_branch('task_2'), spawn_branch('task_3')]
spawn_dummy('task_1') >> spawn_dummy('after_branch') >> [spawn_branch('task_2'), spawn_branch('task_3')]
spawn_branch('task_2') >> [task_2, spawn_dummy('task_2')] >> join
spawn_branch('task_3') >> [task_3, spawn_dummy('task_3')] >> join
join >> spawn_branch('task_4') >> [task_4, spawn_dummy('task_4')]
График выглядит следующим образом, и, как вы можете видеть, обе задачи после ветвей 2 и 3 выполняются (зеленый - выполненный, розовый - пропущенный):
![enter image description here](https://i.stack.imgur.com/A7T0Q.png)
Странно, что я получаю это в журналах, например для branch2:
[2020-02-12 08: 16: 32,810] {python_operator.py:113} ИНФОРМАЦИЯ - Готово. Возвращаемое значение: dummy_task_2
[2020-02-12 08: 16: 32,811] {python_operator.py:143} INFO - следующая ветвь ['dummy_task_2']
[2020-02- 12 08: 16: 32,812] {python_operator.py:144} ИНФОРМАЦИЯ - Пометка других задач прямого потока как пропущенных
Однако выполняются и задачи_2, и dummy_task2, как вы можете видеть на скриншоте ветка / task3.
Это поведение задумано или это ошибка? Версия My Airflow - 1.10.3