Поток воздуха, при неудаче, заставляет всех даг делать что-то конкретное - PullRequest
0 голосов
/ 11 мая 2019

У нас есть много DAG, работающих на Airflow. Когда что-то не получается, мы хотим получить уведомление или предпринять конкретное действие: я попытался через декоратор

def on_failure_callback(f):
  @wraps(f)
  def wrap(*args, **kwargs):
    try:
      return f(*args, **kwargs)
    except Exception as e:
      return f"An exception {e} on ocurred on{f}" 
  return wrap

Это работает, но необходимо украсить любую функцию, для которой мы хотим иметь такое поведение.

Я видел это и попытался реализовать это так:

def on_failure_callback(context):
    operator = PythonOperator(
        python_callable=failure)

    return operator.execute(context=context)


def failure():
    return 'Failure in the failure func'


dag_args = {
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    'on_failure_callback': on_failure_callback
}

А затем в определении DAG я использую [...] default_args=dag_args [...], но эта опция не работает.

Каков наилучший способ сделать это?

Спасибо

Ответы [ 2 ]

1 голос
/ 15 мая 2019

Самый простой способ, IMO, это определить его в качестве аргумента по умолчанию в случае сбоя DAG.

default_args = {'owner': 'airflow', 'depen_on_past': False, 'start_date': datetime(2015, 6, 1), 'email': ['airflow@example.com'], 'email_on_failure': False, ' email_on_retry': False , 'retries':1, 'retry_delay': timedelta (minutes = 5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime (2016, 1, 1),}

Вы также можете использовать оператор sendgrid, если вы хотите указать поведение отправки электронной почты в зависимости от вашей задачи.https://github.com/apache/airflow/blob/master/airflow/contrib/utils/sendgrid.py

1 голос
/ 13 мая 2019

Самый простой способ: воздушный поток отправляет почту при повторных попытках и сбое, если атрибуты email_on_retry и email_on_failure от BaseOperator имеют значение true (по умолчанию true) и настроена конфигурация почты воздушного потока.

С пользовательским оператором:

def on_failure_callback(context):
    # with mail:
    error_mail = EmailOperator(
        task_id='error_mail',
        to='user@example.com',
        subject='Fail',
        html_content='a task failed',
        mime_charset='utf-8')
    error_mail.execute({})  # no need to return, just execute

    # with slack:
    error_message = SlackAPIPostOperator(
        task_id='error_message',
        token=getSlackToken(),
        text='a task failed',
        channel=SLACK_CHANNEL,
        username=SLACK_USER)
    error_message.execute({})  # no need to return, just execute

dag_args = {
    "retries": 2,
    "retry_delay": timedelta(minutes=2),
    'on_failure_callback': on_failure_callback
}
...