Как передать Jinja2 в HiveQL, используя Python - PullRequest
0 голосов
/ 20 декабря 2018

Я использую Gcloud Composer в качестве своего воздушного потока.Когда я пытаюсь использовать Jinja в своем HQL-коде, он не переводится правильно.Я знаю, что HiveOperator имеет переводчик Jinja, как я к нему привык, но DataProcHiveOperator этого не делает.

Я пытался использовать HiveConfнепосредственно в мои файлы HQL, но при установке этих значений в мой раздел (т.е. INSERT INTO TABLE abc PARTITION (ds = ${hiveconf:ds})) `, это не работает.Я также добавил следующее в мой HQL-файл:

SET ds=to_date(current_timestamp());

SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

Но это не сработало, поскольку HIVE преобразует приведенную выше формулу в STRING.

Так что моей идеей было объединить обаоператоры, чтобы переводчик Jinja работал нормально, но когда я это делаю, я получаю следующую ошибку: ERROR - submit() takes from 3 to 4 positional arguments but 5 were given.

Я не очень хорошо знаком с кодированием Python, и любая помощь будет отличной, смотрите код нижедля оператора, который я пытаюсь построить;

Заголовок файла Python (обратите внимание, что файл содержит другие операторы, не упомянутые в этом вопросе):

import ntpath
import os
import re
import time
import uuid
from datetime import timedelta

from airflow.contrib.hooks.gcp_dataproc_hook import DataProcHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
from googleapiclient.errors import HttpError
from airflow.utils import timezone
from airflow.utils.operator_helpers import context_to_airflow_vars

модифицированный DataprocHiveOperator:

class DataProcHiveOperator(BaseOperator):

template_fields = ['query', 'variables', 'job_name', 'cluster_name', 'dataproc_jars']
template_ext = ('.q',)
ui_color = '#0273d4'

@apply_defaults
def __init__(
        self,
        query=None,
        query_uri=None,
        hiveconfs=None,
        hiveconf_jinja_translate=False,
        variables=None,
        job_name='{{task.task_id}}_{{ds_nodash}}',
        cluster_name='cluster-1',
        dataproc_hive_properties=None,
        dataproc_hive_jars=None,
        gcp_conn_id='google_cloud_default',
        delegate_to=None,
        region='global',
        job_error_states=['ERROR'],
        *args,
        **kwargs):

    super(DataProcHiveOperator, self).__init__(*args, **kwargs)
    self.gcp_conn_id = gcp_conn_id
    self.delegate_to = delegate_to
    self.query = query
    self.query_uri = query_uri
    self.hiveconfs = hiveconfs or {}
    self.hiveconf_jinja_translate = hiveconf_jinja_translate
    self.variables = variables
    self.job_name = job_name
    self.cluster_name = cluster_name
    self.dataproc_properties = dataproc_hive_properties
    self.dataproc_jars = dataproc_hive_jars
    self.region = region
    self.job_error_states = job_error_states

def prepare_template(self):
    if self.hiveconf_jinja_translate:
        self.query_uri= re.sub(
            "(\$\{(hiveconf:)?([ a-zA-Z0-9_]*)\})", "{{ \g<3> }}", self.query_uri)

def execute(self, context):
    hook = DataProcHook(gcp_conn_id=self.gcp_conn_id,
                        delegate_to=self.delegate_to)

    job = hook.create_job_template(self.task_id, self.cluster_name, "hiveJob",
                                   self.dataproc_properties)

    if self.query is None:
        job.add_query_uri(self.query_uri)
    else:
        job.add_query(self.query)

    if self.hiveconf_jinja_translate:
        self.hiveconfs = context_to_airflow_vars(context)
    else:
        self.hiveconfs.update(context_to_airflow_vars(context))

    job.add_variables(self.variables)
    job.add_jar_file_uris(self.dataproc_jars)
    job.set_job_name(self.job_name)

    job_to_submit = job.build()
    self.dataproc_job_id = job_to_submit["job"]["reference"]["jobId"]

    hook.submit(hook.project_id, job_to_submit, self.region, self.job_error_states)

Я хотел бы иметь возможность использовать Jinja-шаблоны внутри моего HQL-кода, чтобы позволить автоматизацию секций в моем конвейере данных.

PS: Я буду использовать Jinja-шаблоны в основном для Partition DateStamp

Кто-нибудь знает, что за сообщение об ошибке я получаю + помогите мне ее решить?

ERROR - submit() takes from 3 to 4 positional arguments but 5 were given

Спасибо!

1 Ответ

0 голосов
/ 21 декабря 2018

Это из-за 5-го аргумента job_error_states, который находится только в master, а не в текущем стабильном выпуске (1.10.1).

Исходный код для 1.10.1 -> https://github.com/apache/incubator-airflow/blob/76a5fc4d2eb3c214ca25406f03b4a0c5d7250f71/airflow/contrib/hooks/gcp_dataproc_hook.py#L219

Так что удалите этот параметр, и он должен работать.

...