Доступ к аргументу params в пользовательском операторе в Apache Airflow - PullRequest
0 голосов
/ 27 февраля 2019

Проблема

Я хотел бы передать список значений или даже любое значение в качестве аргумента пользовательскому оператору, изменить значения в операторе, а затем получить доступ к этим значениям вШаблон sql с помощью макроса {{ params }}.

Текущая настройка

Вот соответствующие части моей настройки, слегка придуманные для ясности.

Определение DAG:

from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2019, 2, 27),
    'provide_context': True,
    'depends_on_past': True
}

dag = DAG(
    'etl',
    schedule_interval=None,
    dagrun_timeout=timedelta(minutes=60),
    template_searchpath=tmpl_search_path,
    default_args=default_args,
    max_active_runs=1)

process_product_dim = ProcessDimensionOperator(
    task_id='process_product_dim',
    mysql_conn_id='mysql_dwh',
    sql='process_dimension.sql',
    database='dwh',
    col_names=[
        'id',
        'name',
        'category',
        'price',
        'available',
        'country',
    ],
    t_name='products',
    dag=dag)

Определение оператора:

from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class ProcessDimensionOperator(BaseOperator):
    template_fields = (
        'sql',
        'parameters')
    template_ext = ('.sql',)

    @apply_defaults
    def __init__(
            self,
            sql,
            t_name,
            col_names,
            database,
            mysql_conn_id='mysql_default',
            *args, **kwargs):
        super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
        self.sql = sql
        self.t_name = t_name
        self.col_names = col_names
        self.database = database
        self.mysql_conn_id = mysql_conn_id
        self.parameters = parameters

    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)

        self.params['col_names'] = self.col_names
        self.params['t_name'] = self.t_name
        self.params['match_statement'] = self.construct_match_statement(self.col_names)

        hook.run(sql=self.sql)

    def construct_match_statement(self, cols):
        map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])

        return ' '.join(map_list)

process_dimension.sql

create table if not exists staging.{{ params.t_name }};

select
    *
from
    source.{{ params.t_name }} as source
join
    target.{{ params.t_name }} as target
    on source.id = target.id {{ params.match_statement }}

Но это приводит к ошибкам, поскольку {{ params.t_name }} и {{ params.match_statement}} отображаются как нулевые.

Что я пробовал

  • Установка t_name и c_name в аргументе params при определении задачи и оставление логики map / join в шаблоне sql.Это работает, но я бы хотел по возможности исключить логику из шаблонов
  • Передача params={xxx} в super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
  • Передача параметров в метод hook.run() как parameters={xxx} и шаблонизируя их с %(x)s, но это вызывает проблемы, так как он отображает кавычки вокруг переменных, которые путают различные операторы sql

Я довольно новичок в python и airflow, так что я вполне могу бытьпропустите что-то очевидное, любая помощь будет принята с благодарностью, спасибо!

1 Ответ

0 голосов
/ 23 мая 2019

То же самое здесь.Я просто потратил несколько часов (дней?), Чтобы выяснить причину проблемы (боже, храни IPython.embed и ведение журнала).Начиная с Airflow 1.10.3, это вызвано TaskInstance.render_templates (), который не будет обновлять контекст Jinja, только задачу attibute, после рендеринга любого из полей template_fields или template_exts.Посмотрите здесь !

Поэтому вам просто нужно использовать

{{ task.params.whatever }}

вместо

{{ params.whatever }}

В ваших файлах шаблонов .sql.

На самом деле, если контекст Jinja будет обновляться непрерывно, тогда действительно придется обращать внимание на порядок и зависимости шаблонов.Это своего рода вложенный / рекурсивный рендеринг.Это также может привести к снижению производительности.

Кроме того, я бы не рекомендовал использовать «параметры» (что не совпадает с «params»), поскольку они, как представляется, предназначены для передачи курсорам базы данных в качестве параметров.и тогда вы не сможете передавать числа / целые числа, имена столбцов или таблиц или просто фрагмент SQL (например, где, имея, предел, ...).

...