Как динамически создавать задачи в потоке воздуха - PullRequest
0 голосов
/ 26 марта 2019

Я не могу понять, как динамически создавать задачи в потоке воздуха во время расписания. Мой Dag создается до знания того, сколько задач требуется во время выполнения. Т.е. на каждом триггере dag я хотел бы передать каталог для обработки, чтобы создать список задач для следующего Dag.

Пока что ничего не могу придумать

args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


dir = '/home/uname/dir'
filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2

Здесь, как я должен передать переменную 'dir' при запуске Dag, чтобы task1 и task2 работали в зависимости от количества файлов, присутствующих в 'dir'.

1 Ответ

1 голос
/ 26 марта 2019

Вы можете использовать Переменные воздушного потока или переменные среды.

# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")

# Using Env Vars
import os
dir1= os.environ["dir1"]

args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2
...