Я сам столкнулся с этой проблемой. В отличие от on_failure_callback
, который ищет вызываемую функцию python, похоже, что sla_miss_callback
требуется полный вызов функции.
Пример, который работает для меня:
def sla_miss_alert(dag_id):
"""
Function that alerts me that dag_id missed sla
"""
<function code here>
def task_failure_alert(dag_id, context):
"""
Function that alerts me that a task failed
"""
<function code here>
dag_id = 'sla_test'
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': partial(task_failure_alert, dag_id),
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=sla_miss_alert(dag_id),
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
Насколько я могу судить, sla_miss_callback не имеет доступа к контексту, что вызывает сожаление. Как только я перестал искать контекст, я, наконец, получил свои предупреждения.