Как запустить несколько операторов SQL в шаблоне Jinja воздушного потока с помощью JDBC хук - PullRequest
0 голосов
/ 25 апреля 2018

Попытка запустить hive sql с использованием jdbchook и jinja template через поток воздуха. Шаблон отлично работает для одного оператора SQL, но выдает ошибку синтаксического анализа с несколькими операторами.

  • DAG

    p1 = JdbcOperator( 
    task_id=DAG_NAME+'_create',
    jdbc_conn_id='big_data_hive',
    sql='/mysql_template.sql',
    params={'env': ENVIRON},
    autocommit=True,
    dag=dag)
    
  • Template

    create table {{params.env}}_fct.hive_test_templated
    (cookie_id string
    ,sesn_id string
    ,load_dt string)
    ;
    
    INSERT INTO {{params.env}}_fct.hive_test_templated
    select* from {{params.env}}_fct.hive_test
    ;
    

Ошибка: org.apache.hive.service.cli.HiveSQLException: Ошибка при компиляции оператора: СБОЙ: ParseException строка 7: 0 отсутствует EOF в ';' около ')'

Запросы шаблона работают нормально, когда я запускаю его в Hue.

Ответы [ 2 ]

0 голосов
/ 25 апреля 2018

tobi - это правильно, самый простой способ сделать это - разобрать ваш оператор SQL в список SQL и выполнить их последовательно.

Я делаю это с помощью библиотеки Python sqlparse, чтобы разбить строку на список операторов SQL и затем передать их в ловушку (наследует ловушку dbapi) - базовый класс dbapi принимает списокоператоров SQL и выполняется последовательно, это также может быть легко реализовано в хуевском кусте.В следующем примере мой «CustomSnoqflakeHook» наследуется от хука dbapi, а метод run в хуке dbapi принимает список операторов SQL:

    hook = hooks.CustomSnowflakeHook(snowflake_conn_id=self.snowflake_conn_id)
    sql = sqlparse.split(sqlparse.format(self.sql, strip_comments=True))
    hook.run(
        sql,
        autocommit=self.autocommit,
        parameters=self.parameters)

От хука dbapi:

def run(self, sql, autocommit=False, parameters=None):
        """
        Runs a command or a list of commands. Pass a list of sql
        statements to the sql parameter to get them to execute
        sequentially
        :param sql: the sql statement to be executed (str) or a list of
            sql statements to execute
        :type sql: str or list
        :param autocommit: What to set the connection's autocommit setting to
            before executing the query.
        :type autocommit: bool
        :param parameters: The parameters to render the SQL query with.
        :type parameters: mapping or iterable
        """
        if isinstance(sql, basestring):
            sql = [sql]

        with closing(self.get_conn()) as conn:
            if self.supports_autocommit:
                self.set_autocommit(conn, autocommit)

            with closing(conn.cursor()) as cur:
                for s in sql:
                    if sys.version_info[0] < 3:
                        s = s.encode('utf-8')
                    self.log.info(s)
                    if parameters is not None:
                        cur.execute(s, parameters)
                    else:
                        cur.execute(s)

            if not getattr(conn, 'autocommit', False):
                conn.commit()
0 голосов
/ 25 апреля 2018

Мне кажется, что Хюэ по-разному разбирает утверждение. Иногда существуют реализованные разделители операторов, которые позволяют этому происходить.

Кажется, что в воздушном потоке нет этих сепараторов.

Так что самым простым способом было бы разделить эти два оператора и выполнить эти операторы в двух отдельных задачах.

...