Как я могу передать полученные из ds параметры оператору hive sql в потоке воздуха - PullRequest
0 голосов
/ 05 февраля 2020

С помощью Airflow я хочу выполнить запрос, который возвращает все данные за указанный период на основе ds. DS всегда моя дата окончания, но дата начала может меняться. Это может быть, например, неделя или целый месяц. Чтобы справиться с этим, я хочу создавать различные даг с графиком, который выполняется ежемесячно или еженедельно. Все идет нормально. Тем не менее, я сталкиваюсь с проблемами, когда хочу передать start_dt

в моем шаблоне sql У меня есть это:

where report_dt between '{{ params.report_start_dt }}' AND '{{ds}}' 

В ежемесячном отчете я хочу передать отчет start_dt как например:

monthly_profile = HiveOperator(
            hql= mycode.sql
            params={**args,
                'report_start_dt': '{{ (execution_date.replace(day=1)).strftime("%Y-%m-%d") }}',
                },
            task_id='monthly_profile',
            )

Однако это не удается, так как шаблон, я думаю, не обрабатывает вложенные переменные.

Предоставленный шаблон:

where event_dt between {{ (execution_date.replace(day=1)).strftime("%Y-%m-%d") }} AND  '2019-07-31'

Я видел этот пост: Воздушный поток: передать {{ds}} в качестве параметра для PostgresOperator , но, по моему мнению, я делаю То же самое, но для гивеоператора.

Что я делаю неправильно и как я могу добиться того, что мне нужно, помня, что я также хочу сделать это с простым смещением ds с 7 днями для моего еженедельного бежать?

1 Ответ

1 голос
/ 06 февраля 2020

Проблема в том, что вы пытаетесь использовать библиотеки python для манипулирования данными в вашем шаблоне. Jinja2 не может понять, что вы хотите, поэтому вы получаете этот отрендеренный шаблон.

Вы можете обойти эту проблему, используя функции Hive SQL, и просто передаете аргумент этим функциям. Таким образом, для начала месяца вы можете использовать:

date_add(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'),
         1 - day(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd')) 
        )

А в потоке воздуха:

monthly_profile = HiveOperator(
            hql= mycode.sql
            task_id='monthly_profile',
            )

В файле SQL

    where report_dt between date_add(day('{{ execution_date }}','%Y-%m-%d'),
               1 - day('{{ execution_date }}','%Y-%m-%d'))                       
          AND '{{ds}}' 
...