Воздушный поток - отправлено 2 предупреждения on_failure - PullRequest
4 голосов
/ 17 июня 2020

У меня странная ошибка в Airflow 1.10. Я хотел попробовать отправить электронное письмо и уведомление в Microsoft Teams. Я сделал небольшой тупой DAG, чтобы попробовать. Все работает нормально, но я получил 2 уведомления подряд. 2 электронных письма и 2 сообщения в командах.

Я использовал это для команд: https://github.com/mendhak/Airflow-MS-Teams-Operator

Здесь даги:

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from operators.ms_teams_webhook_operator import MSTeamsWebhookOperator
from airflow.utils.email import send_email_smtp

default_args = {
    "owner": "me",
    "depends_on_past": False,
    "start_date": datetime(2020, 6, 15),
    'email_on_failure': False
}


def on_failure(context):
    dag_id = context['dag_run'].dag_id

    task_id = context['task_instance'].task_id
    # context['task_instance'].xcom_push(key=dag_id, value=True)

    logs_url = f"https://myairflow/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={context['ts']}"

    teams_notification = MSTeamsWebhookOperator(
        task_id="msteams_notify_failure",
        trigger_rule="all_done",
        message=f"{dag_id} has failed on task: {task_id}",
        button_text="View log",
        button_url=logs_url,
        theme_color="FF0000",
        http_conn_id='msteams-python-webhook')
    teams_notification.execute(context)

    title = f"Titre {dag_id} - {task_id}"
    body = title

    send_email_smtp("gil.felot@lisea.fr", title, body)


def print_fail():
    print("Hello !")
    exit(1)


with DAG(
        "test_email2",  # ICI
        default_args=default_args,
        schedule_interval=None
) as dag:
    preprocessing_started = DummyOperator(
        task_id="go_email_go"
    )

    python_fail = PythonOperator(
        task_id="pyhton_def",
        python_callable=print_fail,
        on_failure_callback=on_failure,
        email_on_failure=False
    )

preprocessing_started >> python_fail

РЕДАКТИРОВАТЬ:

Вместо этого используйте крючок. Теперь ничего не срабатывает

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email_smtp
from hooks.ms_teams_webhook_hook import MSTeamsWebhookHook


def on_failure(context):
    dag_id = context['dag_run'].dag_id

    task_id = context['task_instance'].task_id
    # context['task_instance'].xcom_push(key=dag_id, value=True)

    logs_url = f"https://myairflow/admin/airflow/log?dag_id={dag_id}&task_id={task_id}&execution_date={context['ts']}"

    teams_notification_hook = MSTeamsWebhookHook(
        http_conn_id='msteams-python-webhook',
        message=f"Le DAG {dag_id} a échoué sur la tâche : {task_id}",
        subtitle="Voir les logs ?",
        button_text="Logs",
        button_url=logs_url,
        theme_color="FF0000"
    )
    teams_notification_hook.execute()

    title = f"Titre {dag_id} - {task_id}"
    body = title

    send_email_smtp("my@email.fr", title, body)


def on_success(context):
    print("OK callback")
    dag_id = context['dag_run'].dag_id

    for i in context.items():
        print(i)

    teams_notification_hook = MSTeamsWebhookHook(
        http_conn_id='msteams-python-webhook',
        message=f"Le DAG {dag_id} s'est terminé avec succès",
        theme_color="00EE00"
    )
    teams_notification_hook.execute(context)

    title = f"Titre {dag_id} - Success"
    body = title

    send_email_smtp("my@email.fr", title, body)


default_args = {
    "owner": "lisea-mesea",
    "depends_on_past": False,
    "start_date": datetime(2020, 6, 15),
    "email_on_failure": False,
    "on_failure_callback": on_success
    # "on_failure_callback": on_failure
}


def print_fail():
    print("Hello !")
    exit(1)


with DAG(
        "test_email2",  # ICI
        default_args=default_args,
        schedule_interval=None
) as dag:
    preprocessing_started = DummyOperator(
        task_id="go_email_go"
    )

    python_fail = PythonOperator(
        task_id="pyhton_def",
        python_callable=print_fail,
        # on_failure_callback=on_failure,
        email_on_failure=False
    )

preprocessing_started >> python_fail

1 Ответ

3 голосов
/ 25 июня 2020

Я настоятельно рекомендую использовать MSTeamsWebhookHook в on_failure_callback вместо MSTeamsWebhookOperator.

Не вдаваясь в сорняки, BaseOperator из которого MSTeamsWebhookOperator Наследует пытается настроить его на текущий dag при его создании.

Можно увидеть, что установщик свойства dag регистрирует экземпляр задачи на dag.

Это означает, что помимо ручного выполнения оператора, имеющего место в on_failure_callback, запланирован экземпляр задачи для MSTeamsWebhookOperator. Последнее не предназначено, поскольку мы заботимся только о возможности отправлять уведомления, предоставленные в хуке, когда задача не выполняется.

Это не дает объяснения, почему электронные письма отправляются дважды, поскольку это не похоже, использует оператора. Это требует отдельного расследования.

...