Оператор электронной почты Airflow Успех / Неудача - PullRequest
0 голосов
/ 24 марта 2020

В настоящее время я работаю с группой обеспечения доступности баз данных, которая будет отправлять по электронной почте список пользователей, независимо от того, успешно ли выполнена группа обеспечения доступности баз данных или произошла ошибка. Я пытаюсь, чтобы поток DAG выглядел следующим образом:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

def print_hello():
    return 'Hello world!'

default_args = {
        'owner': 'peter',
        'start_date':datetime(2018,8,11),
}

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='* * * * *',
          default_args = default_args, catchup=False)

hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)

email_success = EmailOperator(
        task_id='send_email',
        to='to@gmail.com',
        subject='Airflow Alert Success',
        html_content=""" <h3>Email Test Success</h3> """,
        dag=dag
)

email_failure = EmailOperator(
        task_id='send_email',
        to='to@gmail.com',
        subject='Airflow Alert Failure',
        html_content=""" <h3>Email Test Failed</h3> """,
        dag=dag
)

hello_operator.set_downstream(email_success,email_failure)

Есть ли встроенный оператор, который я могу использовать с airflow, чтобы решить, отправляется ли оператор email_success после завершения DAG или если оператор email_failure выполняется при сбое группы DAG по какой-либо причине?

Спасибо

Ответы [ 3 ]

2 голосов
/ 25 марта 2020

Я думаю, что вы можете использовать BranchOperator, чтобы принять решение отправить электронное письмо о неудаче или успехе. У меня тот же сценарий, в то время как я отправляю письмо по ошибке и просто запускаю DummyOperator в случае успеха. enter image description here

Вы можете проверить этот связанный вопрос о branchOperator Как работает BranchPythonOperator Airflow?

2 голосов
/ 25 марта 2020

Я столкнулся с этим вопросом, когда искал, как заставить Airflow отправлять мне электронные письма в случае успеха.

Airflow может отправлять электронную почту при сбое или повтор с следующий в default_args.

default_args = {
    'email': ['some_email@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
 }

Это должно помочь в части вашего вопроса о сбое, и вы можете использовать ваш EmailOperator 'email_success' в качестве последней задачи в вашей последовательности.

0 голосов
/ 05 апреля 2020

Полагаю, вы ищете Правила триггера

Вы можете установить правило триггера для электронного письма, которое должно отправляться, когда ничего не выходит на all_success (по умолчанию), и Правило триггера для электронного письма, которое должно быть отправлено при ошибке на all_failed.

...