Выполнение одной задачи ПОСЛЕ динамически генерируемых задач через цикл for - PullRequest
2 голосов
/ 09 января 2020

Предположим, у меня есть следующая группа DAG (базовые c функции-заполнители), которая использует for-l oop для динамического генерирования задач (из итерации по списку):

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'ETLUSER',
    'depends_on_past': False,
    'start_date': datetime(2019, 12, 16, 0, 0, 0),
    'email': ['xxx@xxx.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('xxx', catchup=False,
          default_args=default_args, schedule_interval='0 */4 * * *')


# Some dummy function
def StepOne(x):
  print(x)


def StepTwo():
  print("Okay, we finished all of Step 1.")


some_list = [1, 2, 3, 4, 5, 6]


for t in some_list:
  task_id = f'FirstStep_{t}'
  task = PythonOperator(
      task_id=task_id,
      python_callable=StepOne,
      provide_context=False,
      op_kwargs={'x': str(t)},
      dag=dag
  )
  task

Я хочу чтобы ввести некоторую дополнительную задачу, которая просто:

task2 = PythonOperator(
    task_id="SecondStep",
    python_callable=StepTwo,
    provide_context=False,
    dag=dag
  )

, которая выполняется только после того, как все шаги в первом завершены. Линейно, это будет task >> task2

Как мне go сделать это?

1 Ответ

2 голосов
/ 09 января 2020

Вы можете иметь зависимости задачи с массивом.

Выполнить задачу C после завершения задач A и TaskB.

[taskA, taskB] >> taskC

или

Выполняйте задачу B и задачу C параллельно после завершения задачи A.

taskA >> [taskB, taskC]

, пока 1 сторона восходящего или нисходящего потоков не является массивом.

Таким образом для вашего примера

task1 = []
for t in some_list:
    task_id = f'FirstStep_{t}'
    task1.append(PythonOperator(
        task_id=task_id,
        python_callable=StepOne,
        provide_context=False,
        op_kwargs={'x': str(t)},
        dag=dag))

task2 = PythonOperator(
    task_id="SecondStep",
    python_callable=StepTwo,
    provide_context=False,
    dag=dag)

task1 >> task2
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...