Я пытаюсь прочитать sql файл, содержащий запрос с шаблонами jinja в настраиваемом операторе в Airflow. Я уже добился этого с помощью PythonOperator, который вызывает функцию, в которой я использовал
def execute_query(**kwargs)
sql_query = open('my_sql_query.sql').read() #(SELECT * FROM my_table WHERE date > {})
sql_query.format(kwargs['ds'])
, но я бы предпочел использовать этот синтаксис {{ds}} непосредственно в запросе, например SELECT * FROM my_table WHERE date > {{ ds }}
Что я сделал:
- Я создал CustomOperator с template_fields и template_ext
class SQLOperator(BaseOperator):
template_fields = ['sql']
template_ext = ('.sql',)
@apply_defaults
def __init__(
self,
name = None,
sql = None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
self.sql = sql
def execute(self, context):
print("Name", name) # <- works
print("Query", sql) # <- doesn't work and I don't know how to get the sql file content
Даг
default_args = {...}
dag = DAG(
'sql_operator_test',
schedule_interval='0 0 * * *',
template_searchpath=['/Users/username/airflow/dags/sql/test/'],
default_args=default_args)
sql_task = SQLOperator(
task_id='sql_process',
name="Aaa",
sql="/Users/username/airflow/dags/sql/test.sql",
dag=dag)
SQL запрос
SELECT * FROM my_table WHERE date > {{ ds }}
У меня заканчиваются идеи. Есть ли возможность передать файл оператору или получить его визуализированное содержимое?