Сценарий, описанный в вашем вопросе, - это сценарий, в котором очередь первым пришел-первым вышел подходит, если вы хотите сохранить текущий способ явной установки каталога для обработки в виде отдельной последовательности. .
Тем не менее, команда Airflow CLI trigger_dags
позволяет передать флаг --conf
для установки словаря конфигурации, переданного в DagRun
, и я пойду так, как вы уже описали, что там, где установлена переменная, там это сработал даг.
http://airflow.apache.org/cli.html#trigger_dag
Вот как это будет выглядеть в коде.
airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'
Вы установите provide_context
kwargs
в том, какого оператора воздушного потока вы используете для задач.
Экземпляр DagRun можно получить в контексте, а значение dir
установить в полученной конфигурации.
Скажите, что вы определили свои задачи с помощью Airflow PythonOperator
; тогда ваш код для получения dir
в python_callable
будет выглядеть примерно так:
def me_seeks(dag_run=None):
dir = dag_run.conf['me_seeks.dir']