Электронная почта при сбое с использованием AWS SES в Apache Airflow DAG - PullRequest
0 голосов
/ 01 июня 2018

Я пытаюсь, чтобы Airflow отправлял мне электронное письмо с использованием AWS SES всякий раз, когда задача в моей группе доступности баз данных не запускается или повторяется.Я использую свои учетные данные AWS SES, а не общие учетные данные AWS.

Мой текущий airflow.cfg

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 25
smtp_mail_from = myemail@myjob.com

Текущая задача в моей группе обеспечения доступности баз данных, предназначенная дляпреднамеренный сбой и повторная попытка:

testfaildag_library_install_jar_jdbc = PythonOperator(
    task_id='library_install_jar',
    retries=3,
    retry_delay=timedelta(seconds=15),
    python_callable=add_library_to_cluster,
    params={'_task_id': 'cluster_create', '_cluster_name': CLUSTER_NAME, '_library_path':s3000://fakepath.jar},
    dag=dag,
    email_on_failure=True,
    email_on_retry=True,
    email=’myname@myjob.com’,
    provide_context=True
)

Все работает так, как задумано, так как задача повторяет заданное число раз и в конечном итоге дает сбой, за исключением того, что сообщения электронной почты не отправляются.Я также проверил журналы в упомянутой выше задаче, и smtp никогда не упоминается.

Я смотрел на подобный вопрос здесь , но единственное решение там не сработало для меня,Кроме того, документация Airflow, например их пример здесь , мне тоже не подходит.

Работает ли SES с функциями air_on_failure и email_on_retry?

В настоящее время я думаю о том, чтобы использовать функцию on_failure_callback для вызова скрипта Python, предоставляемого AWS здесь , для отправки электронного письма при сбое, но на данном этапе это не предпочтительный маршрут.

Спасибо, благодарю за любую помощь.

1 Ответ

0 голосов
/ 05 июня 2018

- обновлено 6/8 с работающим SES

Вот мое описание того, как все это работает.Внизу этого ответа есть небольшая сводка.

Пара важных моментов:

  1. Мы решили не использовать Amazon SES, а использовать sendmail Теперь у нас есть SES и он работает.
  2. Работник воздушного потока обслуживает функции email_on_failure и email_on_retry.Вы можете сделать journalctl –u airflow-worker –f, чтобы следить за ним во время запуска Dag.На рабочем сервере вам НЕ нужно перезапускать ваш airflow-worker после изменения airflow.cfg с новыми настройками smtp - он должен быть автоматически выбран.Не нужно беспокоиться о том, чтобы испортить работающий в данный момент Dags.

Вот техническое описание использования sendmail:

Поскольку мы перешли с ses на sendmail на localhost, мыпришлось изменить наши настройки SMTP в airflow.cfg.

Новый конфиг:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from =  myjob@mywork.com

Это работает как в производственном, так и в локальном экземплярах воздушного потока.

Некоторые распространенные ошибки, которые могут возникнуть, если их конфигурация не похожа на мою выше:

  • socket.error: [Errno 111] Connection refused - вы должны изменить smtp_host строку в airflow.cfg на localhost
  • smtplib.SMTPException: STARTTLS extension not supported by server. - вы должны изменить smtp_starttls вairflow.cfg до False

В моем локальном тестировании я попытался просто принудительно заставить поток воздуха показывать журнал того, что происходило, когда он пытался отправить электронное письмо - я создал поддельный ярлык какследует:

# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator

# General imports
from datetime import datetime,timedelta

def throwerror():
    raise ValueError("Failure")

SPARK_V_2_2_1 = '3.5.x-scala2.11'

args = {
    'owner': ‘me’,
    'email': ['me@myjob'],
    'depends_on_past': False,
    'start_date': datetime(2018, 5,24),
    'end_date':datetime(2018,6,28)
}

dag = DAG(
    dag_id='testemaildag',
    default_args=args,
    catchup=False,
    schedule_interval="* 18 * * *"
    )

t1 = DummyOperator(
    task_id='extract_data',
    dag=dag
)

t2 = PythonOperator(
    task_id='fail_task',
    dag=dag,
    python_callable=throwerror
)

t2.set_upstream(t1)

Если вы сделаете journalctl -u airflow-worker -f, вы увидите, что работник говорит, что отправил электронное письмо с предупреждением о сбое на адрес электронной почты в вашей группе обеспечения доступности баз данных, но мы все еще не получилиЭл. адрес.Затем мы решили просмотреть почтовые журналы sendmail, выполнив cat /var/log/maillog.Мы увидели бревно, похожее на это:

Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun  5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun  5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<myjob@mycompany.com>, size=1297, nrcpt=1 (queue active)
Jun  5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun  5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out

Так что это, наверное, самый большой момент "О, черт".Здесь мы можем увидеть, что на самом деле происходит в нашем SMTP-сервисе.Мы использовали telnet, чтобы подтвердить, что мы не смогли подключиться к целевым диапазонам IP-адресов из gmail.

Мы определили, что электронная почта пыталась быть отправленной, но службе sendmail не удалось подключиться к диапазонам ipуспешно.

Мы решили разрешить весь исходящий трафик через порт 25 в AWS (поскольку наша производственная среда воздушного потока является экземпляром ec2), и теперь он успешно работает.Теперь мы можем получать электронные письма о сбоях и повторных попытках (подсказка: email_on_failure и email_on_retry по умолчанию имеют значение True в вашем DAG API Reference - вам не нужно указывать это в своих аргументах, есливы не хотите этого делать, но все равно рекомендуется указывать в нем «Истина» или «Ложь».

SES теперь работает.Вот конфиг воздушного потока:

[email]
email_backend = airflow.utils.email.send_email_smtp


[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com 
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = myemail@myjob.com (Verified SES email)

Спасибо!

...