Как удалить задачу из DAG - PullRequest
1 голос
/ 05 марта 2020

У меня есть пара определенных задач, которые должны быть повторно использованы (определены один раз) и в зависимости от некоторой переменной mode связаны по-разному, и некоторые задачи могут быть полностью исключены в желаемом даге, но в настоящий момент не могут найти простой способ, как программно исключить все эти задачи из коллекции задач dag?

(например, в режиме C задачи t3 и t5 не должны быть частью dag.)

Я знаю о BranchPythonOperator и triggering_rule с, но DAG становится очень сложным, потому что семантика depends_on_past необходима в каждом экземпляре задачи.

mode = 'A' or 'B' or 'C'

with DAG(dag_id=mode,
         start_date=...,
         schedule_interval=...,
         default_args=...) as dag:

    start = DummyOperator(task_id='start')

    t1 = DummyOperator(task_id='t1')
    t2 = DummyOperator(task_id='t2')
    t3 = DummyOperator(task_id='t3')
    t4 = DummyOperator(task_id='t4')
    t5 = DummyOperator(task_id='t5')

    end = DummyOperator(task_id='end')

    if mode='A':
      start >> t1 >> t2 >> t3 >> t4 >> t5 >> end
    elif mode='B':
      start >> [t1, t2] >> t3 >> t4 >> t5 >> end
    elif mode='C':
      start >> t1 >> t4 >> t2 >> end

    for i, t in enumerate(dag.tasks):
        if t.task_id not in end.upstream_task_ids:
            # dag.tasks[i].dag = None # error, has no dag setter 
            dag.tasks.remove(t) # doesn't help

Было бы неплохо иметь dag.remove_task(...) таким же образом dag.add_task(...).

1 Ответ

0 голосов
/ 06 марта 2020

Я нашел одно "уродливое" решение, используя методы фабрики задач , у которых dag в качестве параметра.

mode = 'A' or 'B' or 'C'

def start(dag: DAG): return DummyOperator(dag=dag,task_id='start')
def t1(dag: DAG): return DummyOperator(dag=dag,task_id='t1')
def t2(dag: DAG): return DummyOperator(dag=dag,task_id='t2')
def t3(dag: DAG): return DummyOperator(dag=dag,task_id='t3')
def t4(dag: DAG): return DummyOperator(dag=dag,task_id='t4')
def t5(dag: DAG): return DummyOperator(dag=dag,task_id='t5')
def end(dag:DAG): return DummyOperator(dag=dag,task_id='end')


with DAG(dag_id=mode,
         start_date=...,
         schedule_interval=...,
         default_args=...) as dag:

    if mode='A':
      start(dag) >> t1(dag) >> t2(dag) >> t3(dag) >> t4(dag) >> t5(dag) >> end(dag)
    elif mode='B':
      start(dag) >> [t1(dag), t2(dag)] >> t3(dag) >> t4(dag) >> t5(dag) >> end(dag)
    elif mode='C':
      start(dag) >> t1(dag) >> t4(dag) >> t2(dag) >> end(dag)
...