Чтение sql файла с шаблонами jinja в пользовательском операторе в Airflow - PullRequest
1 голос
/ 07 мая 2020

Я пытаюсь прочитать sql файл, содержащий запрос с шаблонами jinja в настраиваемом операторе в Airflow. Я уже добился этого с помощью PythonOperator, который вызывает функцию, в которой я использовал

def execute_query(**kwargs)
    sql_query = open('my_sql_query.sql').read()   #(SELECT * FROM my_table WHERE date > {})
    sql_query.format(kwargs['ds'])

, но я бы предпочел использовать этот синтаксис {{ds}} непосредственно в запросе, например SELECT * FROM my_table WHERE date > {{ ds }}

Что я сделал:

  1. Я создал CustomOperator с template_fields и template_ext
class SQLOperator(BaseOperator):

   template_fields = ['sql']
   template_ext = ('.sql',)

   @apply_defaults
   def __init__(
       self,
       name = None,
       sql = None,
       *args, **kwargs) -> None:
       super().__init__(*args, **kwargs)
       self.name = name
       self.sql = sql

    def execute(self, context):
       print("Name", name) # <- works
       print("Query", sql) # <- doesn't work and I don't know how to get the sql file content

Даг
default_args = {...}

dag = DAG(
    'sql_operator_test',
     schedule_interval='0 0 * * *',
     template_searchpath=['/Users/username/airflow/dags/sql/test/'],
     default_args=default_args)

sql_task = SQLOperator(
    task_id='sql_process',
    name="Aaa",
    sql="/Users/username/airflow/dags/sql/test.sql",
    dag=dag)
SQL запрос
SELECT * FROM my_table WHERE date > {{ ds }}

У меня заканчиваются идеи. Есть ли возможность передать файл оператору или получить его визуализированное содержимое?

...