У меня есть DAG
, и затем, когда он успешен или неуспешен, я хочу, чтобы он вызывал метод, который отправляет сообщения в Slack.
Мой DAG args
, как показано ниже:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
И само определение DAG
:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
Но когда я проверяю Slack, каждую минуту появляется более 100 сообщений, как будто он оценивает каждый тактовый сигнал планировщика и для каждого журнала он запускал метод успеха и сбоя, как если бы он работал и не работал для одной и той же задачи. экземпляр (не в порядке).
Как правильно использовать on_failure_callback
и on_success_callback
для обработки статусов Дагса и вызова пользовательского метода?