Проблема
Я хотел бы передать список значений или даже любое значение в качестве аргумента пользовательскому оператору, изменить значения в операторе, а затем получить доступ к этим значениям вШаблон sql с помощью макроса {{ params }}
.
Текущая настройка
Вот соответствующие части моей настройки, слегка придуманные для ясности.
Определение DAG:
from airflow import DAG
from datetime import timedelta, datetime
from acme.operators.dwh_operators import ProcessDimensionOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2019, 2, 27),
'provide_context': True,
'depends_on_past': True
}
dag = DAG(
'etl',
schedule_interval=None,
dagrun_timeout=timedelta(minutes=60),
template_searchpath=tmpl_search_path,
default_args=default_args,
max_active_runs=1)
process_product_dim = ProcessDimensionOperator(
task_id='process_product_dim',
mysql_conn_id='mysql_dwh',
sql='process_dimension.sql',
database='dwh',
col_names=[
'id',
'name',
'category',
'price',
'available',
'country',
],
t_name='products',
dag=dag)
Определение оператора:
from airflow.hooks.mysql_hook import MySqlHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class ProcessDimensionOperator(BaseOperator):
template_fields = (
'sql',
'parameters')
template_ext = ('.sql',)
@apply_defaults
def __init__(
self,
sql,
t_name,
col_names,
database,
mysql_conn_id='mysql_default',
*args, **kwargs):
super(ProcessDimensionOperator, self).__init__(*args, **kwargs)
self.sql = sql
self.t_name = t_name
self.col_names = col_names
self.database = database
self.mysql_conn_id = mysql_conn_id
self.parameters = parameters
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
self.params['col_names'] = self.col_names
self.params['t_name'] = self.t_name
self.params['match_statement'] = self.construct_match_statement(self.col_names)
hook.run(sql=self.sql)
def construct_match_statement(self, cols):
map_list = map(lambda x: f'and t.{x} = s.{x}', cols[1:])
return ' '.join(map_list)
process_dimension.sql
create table if not exists staging.{{ params.t_name }};
select
*
from
source.{{ params.t_name }} as source
join
target.{{ params.t_name }} as target
on source.id = target.id {{ params.match_statement }}
Но это приводит к ошибкам, поскольку {{ params.t_name }}
и {{ params.match_statement}}
отображаются как нулевые.
Что я пробовал
- Установка
t_name
и c_name
в аргументе params
при определении задачи и оставление логики map / join в шаблоне sql.Это работает, но я бы хотел по возможности исключить логику из шаблонов - Передача
params={xxx}
в super(ProcessDimensionOperator, self).__init__(params=params, *args, **kwargs)
- Передача параметров в метод
hook.run()
как parameters={xxx}
и шаблонизируя их с %(x)s
, но это вызывает проблемы, так как он отображает кавычки вокруг переменных, которые путают различные операторы sql
Я довольно новичок в python и airflow, так что я вполне могу бытьпропустите что-то очевидное, любая помощь будет принята с благодарностью, спасибо!