Функция sla_miss_callback воздушного потока не запускается - PullRequest
1 голос
/ 29 января 2020

Я пытался заставить слабый обратный вызов сообщения сработать при пропадании SLA. Я заметил, что:

  1. sla_misses успешно регистрируются в веб-интерфейсе Airflow по адресу slamiss / list /

  2. on_failure_callback работает успешно

Однако сама функция sla_miss_callback никогда не сработает.

Что я пробовал:

  • Различные комбинации, добавляющие 'sla' и 'sla_miss_callback' на уровне default_args, уровне dag и уровне задач

  • Проверка журналов нашего планировщика и рабочих для SLA связанные сообщения
    https://github.com/apache/airflow/blob/master/airflow/jobs/scheduler_job.py#L416, но мы ничего не видели

  • Функция обратного вызова слабого сообщения работает, если вызывается из любой другой задачи
    basi c или функция
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': send_task_failed_msg_to_slack,
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=send_sla_miss_message_to_slack,
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

def sleep():
    """ Sleep for 2 minutes """
    time.sleep(90)
    LOGGER.info("Slept for 2 minutes")

def simple_print(**context):
    """ Prints a message """
    print("Hello World!")


sleep = PythonOperator(
    task_id="sleep",
    python_callable=sleep,
    dag=dag
    )

simple_task = PythonOperator(
    task_id="simple_task",
    python_callable=simple_print,
    provide_context=True,
    dag=dag
    )

sleep >> simple_task

1 Ответ

1 голос
/ 30 марта 2020

Я сам столкнулся с этой проблемой. В отличие от on_failure_callback, который ищет вызываемую функцию python, похоже, что sla_miss_callback требуется полный вызов функции.

Пример, который работает для меня:

def sla_miss_alert(dag_id):
    """
    Function that alerts me that dag_id missed sla
    """
    <function code here>

def task_failure_alert(dag_id, context):
    """
    Function that alerts me that a task failed
    """
    <function code here>


dag_id = 'sla_test'
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': partial(task_failure_alert, dag_id),
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=sla_miss_alert(dag_id),
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

Насколько я могу судить, sla_miss_callback не имеет доступа к контексту, что вызывает сожаление. Как только я перестал искать контекст, я, наконец, получил свои предупреждения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...