Является ли выполнение динамического цикла (из переменных) для создания задач воздушного потока хорошим подходом? - PullRequest
0 голосов
/ 29 марта 2019

У меня есть простой DAG с воздушным потоком, скажем, с 2 заданиями. как то так:

newDirToLoad = Variable.get("path")
filesInDir = next(os.walk(newDirToLoad))[2] 

for f in filesInDir:
    task1 = BashOperator(
        task_id = "load_for_" + str(f),
        params = {"fileToProcess" : newDirToLoad + "/" + f}
        # ...
    )

    task1 = BashOperator(
        # ...
    )
    task1 >> task2

Здесь переменная path изначально настроена на какой-то фиктивный каталог, чтобы мой Dag не потерпел неудачу при создании Dag. Как только в какой-то момент создается новый каталог с файлами в каталоге «data / to / load /» dir, у меня где-то есть сценарий, который запускает airflow variables -set path data/to/load/$newDir, а затем airflow trigger_dag myDag. Это работает довольно хорошо, и я вижу, что число задач в графическом интерфейсе воздушного потока такое же, как и количество файлов в $ newDir. Но я думаю, что это своего рода настройка, позволяющая создавать динамические задачи с использованием функции Переменная . Есть ли хороший подход? Является ли плохой практикой изначально устанавливать переменную пути для некоторого фиктивного каталога для успешного создания Дага?

...