У меня есть набор задач в DAG для Airflow для задания ETL, которое берет данные из открытого источника, преобразует их, а затем делает их доступными в AWS Athena. Данные разбиты на 5 таблиц, в которых хранятся сообщения о похожих событиях. Я делю данные по дате и идентификатору группы.
Моя текущая установка содержит задачу 'add_partition_ {0} _ {1}', где {0} - таблица, а {1} - идентификатор группы. Эта задача существует во вложенном цикле. Внешний цикл проходит по групповым идентификаторам, а внутренний цикл - по таблицам.
В настоящее время я делаю разбиение, используя Jinja для создания шаблона файла add_partition.sql. Мы изменили стандартный оператор athena, чтобы он мог непосредственно читать файл. Однако размер зацикленных элементов был увеличен, поэтому сейчас создается ~ 150 задач разбиения. Я хотел бы иметь возможность иметь одну задачу add_partitions, которая может взять файл .sql и сгенерировать список операторов для добавления каждого раздела в одну команду. Я должен передать список group_ids, список таблиц и дату шаблону и создать операторы.
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (group_id= '{{ params.group }}', date = '{{ ds }}')
Я бы хотел, чтобы операторы, отправляемые оператору, были похожи на
ALTER TABLE db.table ADD IF NOT EXISTS
PARTITION (group_id= '1', date = '2019-09-30')
ALTER TABLE db.table ADD IF NOT EXISTS
PARTITION (group_id= '2', date = '2019-09-30')
ALTER TABLE db.table ADD IF NOT EXISTS
PARTITION (group_id= '3', date = '2019-09-30')
и так далее. Можно ли это сделать с помощью шаблонов Jinja? Я видел, что зацикливание возможно с использованием Jinja, но я не уверен, как применить это здесь?