У меня есть пара определенных задач, которые должны быть повторно использованы (определены один раз) и в зависимости от некоторой переменной mode
связаны по-разному, и некоторые задачи могут быть полностью исключены в желаемом даге, но в настоящий момент не могут найти простой способ, как программно исключить все эти задачи из коллекции задач dag
?
(например, в режиме C
задачи t3
и t5
не должны быть частью dag.)
Я знаю о BranchPythonOperator
и triggering_rule
с, но DAG становится очень сложным, потому что семантика depends_on_past
необходима в каждом экземпляре задачи.
mode = 'A' or 'B' or 'C'
with DAG(dag_id=mode,
start_date=...,
schedule_interval=...,
default_args=...) as dag:
start = DummyOperator(task_id='start')
t1 = DummyOperator(task_id='t1')
t2 = DummyOperator(task_id='t2')
t3 = DummyOperator(task_id='t3')
t4 = DummyOperator(task_id='t4')
t5 = DummyOperator(task_id='t5')
end = DummyOperator(task_id='end')
if mode='A':
start >> t1 >> t2 >> t3 >> t4 >> t5 >> end
elif mode='B':
start >> [t1, t2] >> t3 >> t4 >> t5 >> end
elif mode='C':
start >> t1 >> t4 >> t2 >> end
for i, t in enumerate(dag.tasks):
if t.task_id not in end.upstream_task_ids:
# dag.tasks[i].dag = None # error, has no dag setter
dag.tasks.remove(t) # doesn't help
Было бы неплохо иметь dag.remove_task(...)
таким же образом dag.add_task(...)
.