У меня довольно простой c рабочий процесс воздушного потока и одно маленькое препятствие, которое я не могу преодолеть. Поэтому моей целью было бы иметь две группы задач. Сначала следует запустить первую группу, затем следующую. Проблема заключается в том, что в группе 2 мне нужно создать зависимости для задач из группы 1. Если определенная задача в группе 1 не выполнена, мы можем пропустить ее зависимость из группы 2.
Итак, это макет Я легко могу достичь: ![enter image description here](https://i.stack.imgur.com/pp6Cz.png)
dag = DAG(
'Example',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
def get_task(name):
return BashOperator(
task_id=name,
bash_command='date',
dag=dag,
trigger_rule=TriggerRule.ALL_DONE
)
t = [get_task(f'Task_{n+1}') for n in range(3)]
d = [get_task(f'Should_depend_on_task_{n+1}') for n in range(3)]
start = get_task('start')
intermediate = get_task('wait_for_1_2_3')
intermediate2 = get_task('wait_for_4_5_6')
end = get_task('end')
start >> t >> intermediate >> d >> intermediate2 >> end
Но мне действительно нужно что-то подобное (розовый означает пропуск): ![enter image description here](https://i.stack.imgur.com/pux1E.png)
Так что я ищу способ динамически проектировать такие зависимости. Или я думаю, что мне действительно нужно было бы динамически сказать, что я хочу пропустить задачу. Любая помощь высоко ценится. Я думал об использовании XCOM, но тогда я понятия не имею, как пропустить задачу во время выполнения. Можно было бы использовать PythonBranchOperator
, но мне нужно было бы добавить его к каждой задаче, которая кажется немного сложной.