Генерация uuid и использование его через Airflow DAG - PullRequest
0 голосов
/ 18 апреля 2019

Я пытаюсь создать динамический поток воздуха, который имеет следующие 2 задачи: Задача 1. Создает файлы с сгенерированным UUID как часть их имени. Задача 2. Запускает проверку этих файлов

.определить переменную 'FILE_UUID' и установить ее следующим образом: str (uuid.uuid4 ()).А также создал постоянное имя файла: MY_FILE = '{file_uuid} _file.csv'.format (file_uuid = FILE_UUID}

Затем - Задача 1 является bashOperator, который получает MY_FILE как часть команды, и создаетфайл успешно. Я вижу, что сгенерированные файлы содержат определенный UUID в имени,

TASK 2 терпит неудачу - это PythonOperator, который получает MY_FILE как op_args. Но не может получить доступ к файлу. Журналы показывают, что он пытаетсядля доступа к файлам с другим UUID.

Почему моя «константа» запускается отдельно для каждой задачи? Есть ли способ предотвратить это?

Я использую Airflow 1.10мой исполнитель - LocalExecutor.

Я попытался установить константу вне «with DAG» и внутри нее, также пытается работать с макросами, но тогда PythonOperator просто использует строки макросов буквально, используя значения, которые они содержат.

1 Ответ

1 голос
/ 23 апреля 2019

Необходимо помнить, что файл определения DAG является своего рода «сценарием конфигурации», а не фактическим исполняемым файлом для запуска ваших групп DAG. Задачи выполняются в совершенно разных средах, в большинстве случаев даже не на одной машине. Думайте об этом как о конфигурационном XML, который устанавливает ваши задачи, а затем они создаются и выполняются на каком-то другом компьютере в облаке - но это не Python, а Python.

В заключение - ваш код DAG - это Python, но он не исполняется во время выполнения ваших задач. Таким образом, если вы сгенерируете случайный uuid там, он будет оцениваться в неизвестное время и несколько раз - для каждой задачи на разных машинах.

Для обеспечения согласованности между задачами вам нужно найти другой способ, например:

  • использует XCOM таким образом, чтобы первые задачи использовали uuid, который он получает, а затем записывает его в XCOM для всех последующих задач.
  • привязать ваш uuid к чему-то постоянному в вашем конвейере, источнике, дате или чем-либо еще (например, если это ежедневное задание, вы можете построить свой uuid из частей даты, смешанных с некоторыми особенностями dag / task и т. Д.) - что угодно сделайте ваш uuid одинаковым для всех задач, но уникальным для уникальных дней)
...