Идентификатор электронной почты переменной шаблона Jinja не отображается при использовании ON_FAILURE_CALLBACK - PullRequest
1 голос
/ 28 мая 2020

Нужна помощь в отображении идентификатора электронной почты шаблона jinja в On_failure_callback.

Я понимаю, что шаблоны визуализации отлично работают в файле SQL или с оператором, имеющим template_fields. Как получить приведенный ниже код, отображающий переменная шаблона jinja

Она отлично работает с Variable.get ('email_edw_alert'), но я не хочу использовать метод Variable, чтобы избежать попадания в DB

Ниже приведен файл Dag

import datetime
import os
from functools import partial
from datetime import timedelta
from airflow.models import DAG,Variable
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from alerts.email_operator import dag_failure_email


def get_db_dag(
    *,
    dag_id,
    start_date,
    schedule_interval,
    max_taskrun,
    max_dagrun,
    proc_nm,
    load_sql
):

    default_args = {
        'owner': 'airflow',
        'start_date': start_date,
        'provide_context': True,
        'execution_timeout': timedelta(minutes=max_taskrun),
        'retries': 0,
        'retry_delay': timedelta(minutes=3),
        'retry_exponential_backoff': True,
        'email_on_retry': False,
    }


    dag = DAG(
        dag_id=dag_id,
        schedule_interval=schedule_interval,
        dagrun_timeout=timedelta(hours=max_dagrun),
        template_searchpath=tmpl_search_path,
        default_args=default_args,
        max_active_runs=1,
        catchup='{{var.value.dag_catchup}}',
        on_failure_callback=partial(dag_failure_email, config={'email_address': '{{var.value.email_edw_alert}}'}),
    )


    load_table = SnowflakeOperator(
        task_id='load_table',
        sql=load_sql,
        snowflake_conn_id=CONN_ID,
        autocommit=True,
        dag=dag,
    )

    load_table

    return dag

# ======== DAG DEFINITIONS #

edw_table_A = get_db_dag(
    dag_id='edw_table_A',
    start_date=datetime.datetime(2020, 5, 21),
    schedule_interval='0 5 * * *',
    max_taskrun=3,  # Minutes
    max_dagrun=1,  # Hours
    load_sql='recon/extract.sql',
)

Ниже приведен код python alert.email_operator

import logging
from airflow.utils.email import send_email
from airflow.models import Variable

logger = logging.getLogger(__name__)

TIME_FORMAT = "%Y-%m-%d %H:%M:%S"

def dag_failure_email(context, config=None):

    config = {} if config is None else config
    task_id = context.get('task_instance').task_id
    dag_id = context.get("dag").dag_id
    execution_time = context.get("execution_date").strftime(TIME_FORMAT)
    reason = context.get("exception")

    alerting_email_address = config.get('email_address')

    dag_failure_html_body = f"""<html>
    <header><title>The following DAG has failed!</title></header>
    <body>
    <b>DAG Name</b>: {dag_id}<br/>
    <b>Task Id</b>: {task_id}<br/>
    <b>Execution Time (UTC)</b>: {execution_time}<br/>
    <b>Reason for Failure</b>: {reason}<br/>
    </body>
    </html>
    """

    try:
        if reason != 'dagrun_timeout':
            send_email(
                to=alerting_email_address,
                subject=f"Airflow alert: <DagInstance: {dag_id} - {execution_time} [failed]",
                html_content=dag_failure_html_body,
            )
    except Exception as e:
        logger.error(
            f'Error in sending email to address {alerting_email_address}: {e}',
            exc_info=True,
        )

Я тоже пробовал другой способ, даже ниже он не работает

   try:
        if reason != 'dagrun_timeout':
            send_email = EmailOperator(
               to=alerting_email_address,
               task_id='email_task',
               subject=f"Airflow alert: <DagInstance: {dag_id} - {execution_time} [failed]",
               params={'content1': 'random'},
               html_content=dag_failure_html_body,
           )
            send_email.dag = context['dag']
            #send_email.to = send_email.get_template_env().from_string(send_email.to).render(**context)
            send_email.to = send_email.render_template(alerting_email_address, send_email.to, context)
            send_email.execute(context)
    except Exception as e:
        logger.error(
            f'Error in sending email to address {alerting_email_address}: {e}',
            exc_info=True,
        )

1 Ответ

1 голос
/ 28 мая 2020

Я не думаю, что шаблоны будут работать таким образом - вам понадобится что-то специально для анализа шаблона. Обычно шаблоны jinja в Airflow используются для передачи шаблонных полей операторам и отображаются с помощью функции render_template (https://airflow.apache.org/docs/stable/_modules/airflow/models/baseoperator.html#BaseOperator .render_template )

Поскольку ваша функция обратного вызова не является оператор, у него не будет этого метода по умолчанию.

Я думаю, что лучше всего здесь будет либо явно вызвать Variable.get во время выполнения самой функции обратного вызова, а не в определении DAG, или реализовать некоторую версию этой функции render_template_fields в обратном вызове. Оба этих решения приведут к обращению к базе данных только во время выполнения этой задачи, а не всякий раз, когда создается группа DAG.

Изменить: только что видел вашу попытку выполнить рендеринг явно с помощью оператора. Указаны ли поля, которые вы хотите создать в шаблоне, как templated_fields в операторе электронной почты?

...