У меня есть оператор Airflow, который позволяет мне запрашивать Athena, которая принимает шаблонный файл Jinja в качестве входного запроса. Обычно я передаю переменные, такие как имена таблиц / баз данных и т. Д. c, в шаблон для создания таблиц и добавления операторов разделов. Это отлично работает для определенных строк.
Мое определение задачи выглядит следующим образом:
db = 'sample_db'
table = 'sample_table'
out = 's3://sample'
p1='2020'
p2='1'
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={'database': db,
'table_name': table,
'p1': p1
'p2': p2},
database=db,
output_location=out
)
Шаблонный файл SQL выглядит следующим образом:
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (partition1= '{{ params.p1 }}', partition2 = '{{ params.p2 }}')
Этот шаблон работает нормально.
Расширение этого позволяет разрешать 'partition1' и 'partition2' определяться с помощью шаблонной переменной jinja, содержащей извлечение XCOM из более ранней задачи, которая преобразует дату в финансовый год и период. Использование даты в качестве раздела возможно, но меня интересует, можно ли заставить параметры принимать шаблоны Jinja.
Код, который я хотел бы использовать, выглядит следующим образом:
db = 'sample_db'
table = 'sample_table'
out = 's3://sample'
p1 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p1") }}'
p2 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p2") }}'
add_partition_task= AWSAthenaOperator(
task_id='add_partition,
query='add_partition.sql',
params={'database': db,
'table_name': table,
'p1': p1
'p2': p2},
database=db,
output_location=out
)
Так что теперь params.p1 и params.p2 содержат шаблон Jinja. Очевидно, что params не поддерживает шаблоны jinja, так как отображаемый SQL содержит строковый литерал '{{task_instance ....', а не отображаемое значение XCOM.
Добавление параметров в поля template_fields в реализации оператора недостаточно, чтобы заставить его отображать шаблон. Мой оператор расширяет только BaseOperator и использует AthenaHook, который расширяет AwsHook. У кого-нибудь есть опыт передачи шаблонных переменных в таких параметрах, как структура или альтернативный подход?