У нас есть много DAG, работающих на Airflow. Когда что-то не получается, мы хотим получить уведомление или предпринять конкретное действие: я попытался через декоратор
def on_failure_callback(f):
@wraps(f)
def wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
return f"An exception {e} on ocurred on{f}"
return wrap
Это работает, но необходимо украсить любую функцию, для которой мы хотим иметь такое поведение.
Я видел это и попытался реализовать это так:
def on_failure_callback(context):
operator = PythonOperator(
python_callable=failure)
return operator.execute(context=context)
def failure():
return 'Failure in the failure func'
dag_args = {
"retries": 2,
"retry_delay": timedelta(minutes=2),
'on_failure_callback': on_failure_callback
}
А затем в определении DAG я использую [...] default_args=dag_args [...]
, но эта опция не работает.
Каков наилучший способ сделать это?
Спасибо