Передача шаблонных переменных в HiveOperator - PullRequest
0 голосов
/ 08 октября 2018

У меня есть шаблон jinja, который я планирую использовать для динамической генерации SQL в Hive.Мой шаблон выглядит следующим образом:

USE {{ db }};

CREATE EXTERNAL TABLE IF NOT EXISTS foo (
    A int,
    B int
)
stored as parquet
location ‘….’;

«db» - это то, что может быть получено путем вызова функции.Я решил написать оператор расширения HiveExecOperator.В моей среде иерархия классов выглядит так:

BaseOperator <—— BaseExecOperator <- HiveExecOperator </p>

Мой оператор TestHive выглядит следующим образом:

class TestHive(HiveExecOperator):
    def pre_execute(self, context):
        context[‘db’] = func1(…,,)
        return context['ti'].render_templates()

Этот оператор не работает как{{db}} внутри шаблона ничего не получает и оператор hive не выполняется.Я также попытался переопределить render_template в TestHive следующим образом:

class TestHive(HiveExecOperator):
    def render_template(self, attr, content, context):
    context['db'] = func1(..,)
    return super(TestHive, self).render_templates(attr, content, context)

Этот сбой, поскольку родительский класс TestHive не имеет метода render_templates.

Method: render_templates" is only defined in BaseOperator.

Любая помощь приветствуется.

1 Ответ

0 голосов
/ 10 октября 2018

Предполагая, что вы имеете в виду HiveOperator, а не HiveExecOperator, и, глядя на то, что вы описываете, я не думаю, что вам нужно здесь выводить какой-либо оператор.Если нет какой-то дополнительной отсутствующей информации, которую я не вижу, вы просто спрашиваете, как передать значение вызова функции в качестве параметра в шаблонную команду.

Аргумент hql HiveOperator представляет собой поле шаблона .Это означает, что вы должны иметь возможность просто определить свой шаблон, как вы уже сделали, а затем предоставить ему значение как часть этого вызова оператора.Но не забудьте поставить префикс переданной переменной с params.Смотри:

my_query= """
    USE {{ params.db }};

    CREATE EXTERNAL TABLE IF NOT EXISTS foo (
    A int,
    B int
    )
    stored as parquet
    location .......
    """

run_hive_query = HiveOperator(
    task_id="my_task",
    hql=my_query,
    params={ 'db': func1(...) },
    dag=dag
)
...