Воздушный поток Jinja Templating в параметрах - PullRequest
0 голосов
/ 15 января 2020

У меня есть оператор Airflow, который позволяет мне запрашивать Athena, которая принимает шаблонный файл Jinja в качестве входного запроса. Обычно я передаю переменные, такие как имена таблиц / баз данных и т. Д. c, в шаблон для создания таблиц и добавления операторов разделов. Это отлично работает для определенных строк.

Мое определение задачи выглядит следующим образом:

        db = 'sample_db'
        table = 'sample_table'
        out = 's3://sample'
        p1='2020'
        p2='1'

        add_partition_task= AWSAthenaOperator(
            task_id='add_partition,
            query='add_partition.sql',
            params={'database': db,
                    'table_name': table,
                    'p1': p1
                    'p2': p2},
            database=db,
            output_location=out
        )

Шаблонный файл SQL выглядит следующим образом:

ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (partition1= '{{ params.p1 }}', partition2 = '{{ params.p2 }}')

Этот шаблон работает нормально.

Расширение этого позволяет разрешать 'partition1' и 'partition2' определяться с помощью шаблонной переменной jinja, содержащей извлечение XCOM из более ранней задачи, которая преобразует дату в финансовый год и период. Использование даты в качестве раздела возможно, но меня интересует, можно ли заставить параметры принимать шаблоны Jinja.

Код, который я хотел бы использовать, выглядит следующим образом:

        db = 'sample_db'
        table = 'sample_table'
        out = 's3://sample'
        p1 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p1") }}'
        p2 = '{{ task_instance.xcom_pull(task_ids="convert_to_partition", key="p2") }}'

        add_partition_task= AWSAthenaOperator(
            task_id='add_partition,
            query='add_partition.sql',
            params={'database': db,
                    'table_name': table,
                    'p1': p1
                    'p2': p2},
            database=db,
            output_location=out
        )

Так что теперь params.p1 и params.p2 содержат шаблон Jinja. Очевидно, что params не поддерживает шаблоны jinja, так как отображаемый SQL содержит строковый литерал '{{task_instance ....', а не отображаемое значение XCOM.

Добавление параметров в поля template_fields в реализации оператора недостаточно, чтобы заставить его отображать шаблон. Мой оператор расширяет только BaseOperator и использует AthenaHook, который расширяет AwsHook. У кого-нибудь есть опыт передачи шаблонных переменных в таких параметрах, как структура или альтернативный подход?

...