У меня есть простой DAG с воздушным потоком, скажем, с 2 заданиями. как то так:
newDirToLoad = Variable.get("path")
filesInDir = next(os.walk(newDirToLoad))[2]
for f in filesInDir:
task1 = BashOperator(
task_id = "load_for_" + str(f),
params = {"fileToProcess" : newDirToLoad + "/" + f}
# ...
)
task1 = BashOperator(
# ...
)
task1 >> task2
Здесь переменная path изначально настроена на какой-то фиктивный каталог, чтобы мой Dag не потерпел неудачу при создании Dag.
Как только в какой-то момент создается новый каталог с файлами в каталоге «data / to / load /» dir, у меня где-то есть сценарий, который запускает airflow variables -set path data/to/load/$newDir
, а затем airflow trigger_dag myDag
. Это работает довольно хорошо, и я вижу, что число задач в графическом интерфейсе воздушного потока такое же, как и количество файлов в $ newDir. Но я думаю, что это своего рода настройка, позволяющая создавать динамические задачи с использованием функции Переменная . Есть ли хороший подход? Является ли плохой практикой изначально устанавливать переменную пути для некоторого фиктивного каталога для успешного создания Дага?