Поток воздуха повторно выполнить одну задачу несколько раз в случае успеха - PullRequest
0 голосов
/ 19 февраля 2019

Каков наилучший способ выполнить задачу (A) 3 раза подряд ?:

Это задача A -> задача A -> задача A -> задача B

Я спрашиваюпотому что я буду запускать другую отдельную задачу проверки данных (B), которая будет сравнивать данные из этих трех отдельных прогонов.

Так вот, что я сделал до сих пор:

dag = DAG("hello_world_0", description="Starting tutorial", schedule_interval='* * * * *',
          start_date=datetime(2019, 1, 1),
          catchup=False)

data_pull_1 = BashOperator(task_id='attempt_1', bash_command='echo "Hello World - 1!"',dag=dag)
data_pull_2 = BashOperator(task_id='attempt_2', bash_command='echo "Hello World - 2!"',dag=dag)
data_pull_3 = BashOperator(task_id='attempt_3', bash_command='echo "Hello World - 3!"',dag=dag)

data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"',dag=dag)


data_pull_1 >> data_pull_2 >> data_pull_3 >> data_validation

Это можетработать, но есть ли более элегантный способ?

1 Ответ

0 голосов
/ 19 февраля 2019

Вы можете попробовать ниже реализацию, мы создаем 3 операции, используя цикл for

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    "hello_world_0",
    description="Starting tutorial",
    schedule_interval=None,
    start_date=datetime(2019, 1, 1),
    catchup=False
)

chain_operators = []
max_attempt = 3
for attempt in range(max_attempt):
    data_pull = BashOperator(
        task_id='attempt_{}'.format(attempt),
        bash_command='echo "Hello World - {}!"'.format(attempt),
        dag=dag
    )
    chain_operators.append(data_pull)

data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"', dag=dag)
chain_operators.append(data_validation)

# Add downstream
for i,val in enumerate(chain_operators[:-1]):
    val.set_downstream(chain_operators[i+1])

Я изменил schedule_interval на None, потому что с '* * * * *' задание будет запускаться непрерывно

...