Как передать параметры в Airflow on_success_callback и on_failure_callback - PullRequest
0 голосов
/ 21 июня 2019

Я реализовал оповещения по электронной почте об успехах и неудачах, используя on_success_callback и on_failure_callback.

Согласно Документация по воздушному потоку ,

словарь контекста передается какодин параметр этой функции.

Как передать другой параметр этим методам обратного вызова?

Вот мой код

from airflow.utils.email import send_email_smtp

def task_success_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Success".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Succeeded on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

def task_failure_alert(context):
    subject = "[Airflow] DAG {0} - Task {1}: Failed".format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1]
        )
    html_content = """
    DAG: {0}<br>
    Task: {1}<br>
    Failed on: {2}
    """.format(
        context['task_instance_key_str'].split('__')[0], 
        context['task_instance_key_str'].split('__')[1], 
        datetime.now()
        )
    send_email_smtp(dag_vars["dev_mailing_list"], subject, html_content)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

Я намереваюсь переместитьобратные вызовы в другой пакет и передачу адреса электронной почты в качестве параметра.

1 Ответ

1 голос
/ 21 июня 2019

Вы можете определить функцию внутри вашего dag, которая вызывает функцию из вашего пакета.И при вызове этой функции передайте электронную почту в качестве аргумента.Вы можете уточнить его на своем уровне DAG, чтобы передавать только информацию, необходимую для электронных писем.

from package import outer_task_success_callback
email = 'xyz@example.com'

def task_success_alert(context):
    dag_id = context['dag'].dag_id
    task_id = context['task_instance']. task_id
    outer_task_success_callback(dag_id, tasak_id, email)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 13),
    'on_success_callback': task_success_alert,
    'on_failure_callback': task_failure_alert
}

Это позволит вам выполнить настройку перед вызовом функции в вашем пакете.

Напримечание: у airflow есть функция электронной почты smtp.Вместо того, чтобы писать собственное решение, вы можете использовать их.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...