как распараллелить похожие задачи BashOperator, но разные параметры в DAG Airflow - PullRequest
0 голосов
/ 04 апреля 2019

У меня есть параллельное выполнение 2 задач ниже в моем DAG В реальном мире это могут быть 15 или 20 задач с входными параметрами из массива, как показано ниже.

fruits = ["apples", "bananas"]

bad_dag = DAG('bad_dag_3', default_args=default_args, schedule_interval=None)

t0=BashOperator(
    task_id="print",
    bash_command='echo "Beginning parallel tasks next..." ',
    dag=bad_dag)

t1=BashOperator(
    task_id="fruit_"+fruits[0],
    params={"fruits": fruits}, 
    bash_command='echo fruit= {{params.fruits[0]}} ',
    dag=bad_dag)

t2=BashOperator(
    task_id="fruit_"+fruits[1],
    params={"fruits": fruits},
    bash_command='echo fruit= {{params.fruits[1]}} ',
    dag=bad_dag)

t0>>[t1, t2]

Какой лучший способ написать этот DAG для меня, так что мне не нужно переписывать один и тот же BashOperator снова и снова, как я описал выше.

Я не могу использовать цикл, потому что не могу распараллелить задачи, если использую цикл.

1 Ответ

0 голосов
/ 05 апреля 2019

Используйте нижеприведенный DAG.Идея состоит в том, что task_id для каждой задачи должен быть уникальным, а воздушный поток будет обрабатывать все остальное.

fruits = ["apples", "bananas"]

bad_dag = DAG('bad_dag_3', default_args=default_args, schedule_interval=None)

t0=BashOperator(
    task_id="print",
    bash_command='echo "Beginning parallel tasks next..." ',
    dag=bad_dag)

for fruit in fruits:
    task_t = BashOperator(
        task_id="fruit_"+fruit,
        params={"fruit": fruit},
        bash_command='echo fruit= {{params.fruit}} ',
        dag=bad_dag)

    t0 >> task_t
...