- обновлено 6/8 с работающим SES
Вот мое описание того, как все это работает.Внизу этого ответа есть небольшая сводка.
Пара важных моментов:
- Мы решили не использовать Amazon SES, а использовать sendmail Теперь у нас есть SES и он работает.
- Работник воздушного потока обслуживает функции
email_on_failure
и email_on_retry
.Вы можете сделать journalctl –u airflow-worker –f
, чтобы следить за ним во время запуска Dag.На рабочем сервере вам НЕ нужно перезапускать ваш airflow-worker после изменения airflow.cfg
с новыми настройками smtp - он должен быть автоматически выбран.Не нужно беспокоиться о том, чтобы испортить работающий в данный момент Dags.
Вот техническое описание использования sendmail:
Поскольку мы перешли с ses на sendmail на localhost, мыпришлось изменить наши настройки SMTP в airflow.cfg
.
Новый конфиг:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = localhost
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
#smtp_user = not used
#smtp_password = not used
smtp_port = 25
smtp_mail_from = myjob@mywork.com
Это работает как в производственном, так и в локальном экземплярах воздушного потока.
Некоторые распространенные ошибки, которые могут возникнуть, если их конфигурация не похожа на мою выше:
socket.error: [Errno 111] Connection refused
- вы должны изменить smtp_host
строку в airflow.cfg
на localhost
smtplib.SMTPException: STARTTLS extension not supported by server.
- вы должны изменить smtp_starttls
вairflow.cfg
до False
В моем локальном тестировании я попытался просто принудительно заставить поток воздуха показывать журнал того, что происходило, когда он пытался отправить электронное письмо - я создал поддельный ярлык какследует:
# Airflow imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
# General imports
from datetime import datetime,timedelta
def throwerror():
raise ValueError("Failure")
SPARK_V_2_2_1 = '3.5.x-scala2.11'
args = {
'owner': ‘me’,
'email': ['me@myjob'],
'depends_on_past': False,
'start_date': datetime(2018, 5,24),
'end_date':datetime(2018,6,28)
}
dag = DAG(
dag_id='testemaildag',
default_args=args,
catchup=False,
schedule_interval="* 18 * * *"
)
t1 = DummyOperator(
task_id='extract_data',
dag=dag
)
t2 = PythonOperator(
task_id='fail_task',
dag=dag,
python_callable=throwerror
)
t2.set_upstream(t1)
Если вы сделаете journalctl -u airflow-worker -f
, вы увидите, что работник говорит, что отправил электронное письмо с предупреждением о сбое на адрес электронной почты в вашей группе обеспечения доступности баз данных, но мы все еще не получилиЭл. адрес.Затем мы решили просмотреть почтовые журналы sendmail, выполнив cat /var/log/maillog
.Мы увидели бревно, похожее на это:
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: connect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: ID: client=localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/cleanup[port]: ID: message-id=<randomMessageID@production-server-ip-range-ec2-instance>
Jun 5 14:10:25 production-server-ip-range postfix/smtpd[port]: disconnect from localhost[127.0.0.1]
Jun 5 14:10:25 production-server-ip-range postfix/qmgr[port]: MESSAGEID: from=<myjob@mycompany.com>, size=1297, nrcpt=1 (queue active)
Jun 5 14:10:55 production-server-ip-range postfix/smtp[port]: connect to aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Jun 5 14:11:25 production-server-ip-range postfix/smtp[port]: connect to alt1.aspmx.l.google.com[smtp-ip-range]:25: Connection timed out
Так что это, наверное, самый большой момент "О, черт".Здесь мы можем увидеть, что на самом деле происходит в нашем SMTP-сервисе.Мы использовали telnet, чтобы подтвердить, что мы не смогли подключиться к целевым диапазонам IP-адресов из gmail.
Мы определили, что электронная почта пыталась быть отправленной, но службе sendmail не удалось подключиться к диапазонам ipуспешно.
Мы решили разрешить весь исходящий трафик через порт 25 в AWS (поскольку наша производственная среда воздушного потока является экземпляром ec2), и теперь он успешно работает.Теперь мы можем получать электронные письма о сбоях и повторных попытках (подсказка: email_on_failure
и email_on_retry
по умолчанию имеют значение True
в вашем DAG API Reference - вам не нужно указывать это в своих аргументах, есливы не хотите этого делать, но все равно рекомендуется указывать в нем «Истина» или «Ложь».
SES теперь работает.Вот конфиг воздушного потока:
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = emailsmtpserver.region.amazonaws.com
smtp_starttls = True
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = REMOVEDAWSACCESSKEY
smtp_password = REMOVEDAWSSECRETACCESSKEY
smtp_port = 587
smtp_mail_from = myemail@myjob.com (Verified SES email)
Спасибо!