Динамически генерирующие разделы в Airflow - PullRequest
0 голосов
/ 30 сентября 2019

У меня есть набор задач в DAG для Airflow для задания ETL, которое берет данные из открытого источника, преобразует их, а затем делает их доступными в AWS Athena. Данные разбиты на 5 таблиц, в которых хранятся сообщения о похожих событиях. Я делю данные по дате и идентификатору группы.

Моя текущая установка содержит задачу 'add_partition_ {0} _ {1}', где {0} - таблица, а {1} - идентификатор группы. Эта задача существует во вложенном цикле. Внешний цикл проходит по групповым идентификаторам, а внутренний цикл - по таблицам.

В настоящее время я делаю разбиение, используя Jinja для создания шаблона файла add_partition.sql. Мы изменили стандартный оператор athena, чтобы он мог непосредственно читать файл. Однако размер зацикленных элементов был увеличен, поэтому сейчас создается ~ 150 задач разбиения. Я хотел бы иметь возможность иметь одну задачу add_partitions, которая может взять файл .sql и сгенерировать список операторов для добавления каждого раздела в одну команду. Я должен передать список group_ids, список таблиц и дату шаблону и создать операторы.

ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
  PARTITION (group_id= '{{ params.group }}', date = '{{ ds }}')

Я бы хотел, чтобы операторы, отправляемые оператору, были похожи на

ALTER TABLE db.table ADD IF NOT EXISTS
  PARTITION (group_id= '1', date = '2019-09-30')
ALTER TABLE db.table ADD IF NOT EXISTS
  PARTITION (group_id= '2', date = '2019-09-30')
ALTER TABLE db.table ADD IF NOT EXISTS
  PARTITION (group_id= '3', date = '2019-09-30')

и так далее. Можно ли это сделать с помощью шаблонов Jinja? Я видел, что зацикливание возможно с использованием Jinja, но я не уверен, как применить это здесь?

1 Ответ

0 голосов
/ 03 октября 2019

Оказывается, это невероятно легко сделать с некоторыми очень простыми шаблонизаторами Jinja.

{% for group in params.groups %}
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
  PARTITION (group_id= '{{ group }}', date = '{{ ds }}')
{% endfor %}

Затем в коде Python, где я использую AWSAthenaOperator, я зацикливаюсь по таблицам и передаю список идентификаторов групп,база данных и имя_таблицы в качестве параметров (ds предоставляется макросами Airflow).

...