После поиска и поиска этого связанного вопроса .Я нашел способ заставить эту работу (хотя, как мы увидим, это не идеальное решение).Вот рабочий пример из трех частей: (1) файл шаблона sql с небольшим количеством шаблонов jinja, (2) DAG и (3) команда gcloud
, необходимая для загрузки шаблона в нужное место.
(1) Файл шаблона sql Это просто текстовый файл, имя файла которого заканчивается расширением .sql
.Допустим, этот файл называется my-templated-query.sql
и содержит:
SELECT COUNT(1)
FROM mytable
WHERE _PARTITIONTIME = TIMESTAMP('{{ ds }}')
(2) Ссылка на шаблон в файле DAG Для ссылки на этот шаблон создайтеоператор, подобный следующему:
count_task = BigQueryOperator(
task_id='count_rows',
sql='/my-templated-query.sql')
(3) Добавление файла шаблона в Google Cloud Composer Получается, что по умолчанию airflow ищет файлы шаблона впапка дагс.Чтобы загрузить наш шаблонный файл в папку dags, мы запускаем
gcloud beta composer environments storage dags import --environment my-env-name --location us-central1 --source path/to/my-templated-query.sql
Вам необходимо соответственно заменить имя env, местоположение и исходный путь.
Не совсем правильно загружать все эти шаблоны в папку dag.Лучшей практикой Airflow является размещение ваших шаблонов в отдельной папке и указание параметра template_searchpath
на , указывающего на него при создании DAG .Однако я не уверен, как это сделать с помощью Google Cloud Composer.
Обновление: Я понял, что можно поместить подпапки в папку DAG, что полезно для организации большихномера шаблонов SQL.Допустим, я поместил файл шаблона SQL в DAG_FOLDER/dataset1/table1.sql
. В BigQueryOperator Ithen затем может сослаться на это, используя sql=/dataset1/table1.sql
.Если у вас есть подпапка с большим количеством файлов и множеством других подпапок, вы также можете использовать dag import
, показанный выше, для рекурсивной загрузки всей подпапки - просто укажите ее на подпапку.