Получение шаблона Ошибка при запуске SQL с использованием воздушного потока от python? - PullRequest
4 голосов
/ 14 апреля 2020

Я пытаюсь запустить скрипт SQL из Airflow. Сбой из-за ошибки шаблона.

Сценарий в основном пытается запустить sql из Афины и загрузить в таблицу Redshift.

SQl находится по адресу: redshift/sql/public/flow/KN_AWS_RE_ShipmentData_dup_update.sql

Мой код воздушного потока

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator

from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('sample_sql', default_args=default_args, schedule_interval='0 4 * * *')

execute_notebook = PostgresOperator(
    task_id='sample_sql',
    postgres_conn_id='REDSHIFT_CONN',
    sql="redshift/sql/public/flow/KN_AWS_RE_ShipmentData_dup_update.sql",
    params={'limit': '50'},
    dag=dag
)

Ошибка

[2020-04-14 02:19:24,412] {{standard_task_runner.py:52}} INFO - Started process 23012 to run task
[2020-04-14 02:19:24,482] {{logging_mixin.py:112}} INFO - [2020-04-14 02:19:24,481] {{dagbag.py:403}} INFO - Filling up the DagBag from /usr/local/airflow/dags/Scripts/Sample.py
[2020-04-14 02:19:24,495] {{baseoperator.py:807}} ERROR - KN_AWS_RE_ShipmentData_dup_update.sql
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 805, in resolve_template_files
    setattr(self, field, env.loader.get_source(env, content)[0])
  File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 187, in get_source
    raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: KN_AWS_RE_ShipmentData_dup_update.sql
[2020-04-14 02:19:24,545] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: sample_sql.sample_sql 2020-04-14T02:14:08.020072+00:00 [running]> 0ca54c719ff7
[2020-04-14 02:19:24,583] {{taskinstance.py:1088}} ERROR - KN_AWS_RE_ShipmentData_dup_update.sql
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 940, in _run_raw_task
    self.render_templates(context=context)

Как решить эту проблему?

1 Ответ

1 голос
/ 14 апреля 2020

Необходимо указать путь к файлу шаблона .sql при создании экземпляра группы доступности базы данных с переменной template_searchpath. По умолчанию Jinja просматривает вашу папку DAG.

Обратите внимание, что в вашей группе DAG есть одна плохая практика, которая имеет start_date, который является динамическим c. start_date должно быть фиксированным (т.е. datetime(2020,4,13)) вместо динами c (т.е. datetime.now()). Вы можете прочитать больше об этом здесь .

При этом я попытаюсь изменить определение DAG на следующее:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator

from utils import FAILURE_EMAILS

# Remove this
# yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,4,13),  # Change this
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'sample_sql', 
    default_args=default_args, 
    schedule_interval='0 4 * * *',
    template_searchpath='/redshift/sql/public/flow/')

execute_notebook = PostgresOperator(
    task_id='sample_sql',
    postgres_conn_id='REDSHIFT_CONN',
    sql='KN_AWS_RE_ShipmentData_dup_update.sql',
    params={'limit': '50'},
    dag=dag
)

execute_notebook  # Tell airflow the tasks dependencies, in this case no dependency

Конечно, вы должны выбрать правильный абсолютный базовый путь для присвоения template_searchpath, поэтому что-то вроде /home/redshift/sql/public/flow.

...