Воздушный поток - правильный способ обработки обратных вызовов DAG - PullRequest
0 голосов
/ 03 июля 2018

У меня есть DAG, и затем, когда он успешен или неуспешен, я хочу, чтобы он вызывал метод, который отправляет сообщения в Slack.

Мой DAG args, как показано ниже:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

И само определение DAG:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

Но когда я проверяю Slack, каждую минуту появляется более 100 сообщений, как будто он оценивает каждый тактовый сигнал планировщика и для каждого журнала он запускал метод успеха и сбоя, как если бы он работал и не работал для одной и той же задачи. экземпляр (не в порядке).

Как правильно использовать on_failure_callback и on_success_callback для обработки статусов Дагса и вызова пользовательского метода?

Ответы [ 2 ]

0 голосов
/ 03 июля 2018

Причина, по которой он создает сообщения, заключается в том, что когда вы определяете default_args, вы выполняете функции. Вам нужно просто передать определение функции, не выполняя ее.

Поскольку у функции есть аргумент, она станет немного хитрее. Вы можете определить две частичные функции или две функции-оболочки.

Так что вы можете сделать:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

или

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

В любом методе обратите внимание, как передаются только определения функций failure_msg и success_msg, а не результат, который они дают при выполнении.

0 голосов
/ 03 июля 2018

Какой метод slack вы имеете в виду? Планировщик анализирует ваш файл DAG при каждом ударе, поэтому если slack какая-то функция, определенная в вашем коде, она будет запускаться при каждом ударе.

Несколько вещей, которые вы можете попробовать:

  • Определите функции, которые вы хотите вызывать как PythonOperators, и затем вызывайте их на уровне задач, а не на уровне DAG.

  • Вы также можете использовать TriggerRules для задания задач после вашей задачи ETL, которые будут запускаться в зависимости от сбоя или успеха родительской задачи.

Из документов : defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}

Вы можете найти пример того, как это будет выглядеть здесь (полное раскрытие - я автор).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...