Как использовать переменные Airflow для динамического запуска нескольких заданий для DAG - PullRequest
1 голос
/ 14 апреля 2019

Я знаю, как динамически запускать задачи DAG с использованием Переменная , и она работает довольно хорошо, пока вы не запустите несколько прогонов для одной и той же DAG.

Т.е., как только в какой-то момент будет создан новый каталог с файлами в data / to / load / dir, у меня где-то написан скрипт, который вызовет airflow variables -set dir data/to/load/$newDir, а затем airflow trigger_dag dyn_test. Теперь предположим, что каталоги "a" и "b" созданы (в одно и то же время) в data / to / load / , что сделает airflow variable + airflow trigger_dag вызовы дважды с двумя разными входами при вызове набора переменных (один суффикс) с «а», а другой с «б» конечно). И я вижу два задания, выполняемых для группы DAG в графическом интерфейсе воздушного потока, но проблема в том, что они оба рассматривают одно и то же значение каталога, a или b. Это может определенно означать, что он принимает последний вызов 'airflow variable Set Set'. Как мне решить это? Как запустить несколько прогонов, каждый из которых принимает разные значения (в переменной dir ) для динамического цикла. Мой Даг выглядит примерно так:

# Using Airflow Variables
from airflow.models import Variable
dir = Variable.get("dir")


args = {
    'owner': 'airflow',
    'start_date': datetime(2004, 11, 12),
}

dag = DAG(
    dag_id='dyn_test',
    default_args=args,
    schedule_interval='@once'
)


filesInDir = next(os.walk(dir))[2] 

for file in filesInDir:
    task1 = # change 'file' structure
    task2 = # store changed 'file'

    task1 >> task2

1 Ответ

1 голос
/ 14 апреля 2019

Сценарий, описанный в вашем вопросе, - это сценарий, в котором очередь первым пришел-первым вышел подходит, если вы хотите сохранить текущий способ явной установки каталога для обработки в виде отдельной последовательности. .

Тем не менее, команда Airflow CLI trigger_dags позволяет передать флаг --conf для установки словаря конфигурации, переданного в DagRun, и я пойду так, как вы уже описали, что там, где установлена ​​переменная, там это сработал даг.

http://airflow.apache.org/cli.html#trigger_dag

Вот как это будет выглядеть в коде.

airflow trigger_dag dyn_test --conf '{"me_seeks.dir": "data/to/load/$newDir"}'

Вы установите provide_context kwargs в том, какого оператора воздушного потока вы используете для задач.

Экземпляр DagRun можно получить в контексте, а значение dir установить в полученной конфигурации.

Скажите, что вы определили свои задачи с помощью Airflow PythonOperator; тогда ваш код для получения dir в python_callable будет выглядеть примерно так:

def me_seeks(dag_run=None):
    dir = dag_run.conf['me_seeks.dir']
...