Уведомления продолжают публиковаться на слабом канале - PullRequest
2 голосов
/ 31 января 2020

Я использую слабые webhooks для отправки сообщения на мой свободный канал. Проблема в том, что он продолжает публиковать сообщения каждые несколько минут. Вот что я сделал ...

Я создал простую функцию в папке util.

def send_to_slack(text):
    conn_id = "https://hooks.slack.com/services/your/slack/URL"    
    task_slack_alert(text, url, is_error=False, args=None)

def task_slack_alert(msg, url, is_error=False, args=None):
     slack_msg = ":red_circle: Task Failed" if is_error else ":green_heart: Task Message"
     """*Task*: {task}  
        *Dag*: {dag} 
        *Execution Time*: {exec_ts}""".format(
                                          task=args["task"],
                                          dag=args["dag"],
                                          exec_ts=args["ts"],
        ) if args else ""
        message = {'text': + msg}
        response = requests.post(url=url, data=json.dumps(message))
        time.sleep(1)
        print(f"Slack response {response}")
        if response.status_code != 200:
            print(f"Error sending chat message. Got: {response.status_code}")

В моем dag (который находится в другой папке) я вызываю функцию Dag копировать данные из oracle для снежной базы, и это работает без слабины. Внутри моего тега я делаю следующее:

 x = {‘key1’: [‘value1’, ‘value 2’, … ‘value10]}
    
    send_to_slack('My test message from python')

default_args = {... 
'on_failure_callback': send_to_slack, }

with DAG(‘my_dag’,
         default_args=default_args,
         catchup=False) as dag:
    parallel = 4
    start = DummyOperator(task_id='start')
    tasks = []
    i = 0
    for s in x.keys():
        for t in x.get(s):
            task = OracleToSnowflakeOperator(                
                task_id=s + '_' + t,
                source_oracle_conn_id=source_oracle_conn_id,
                source_schema=schema,
                source_table=table,…
            )
            if i <= parallel:
                task.set_upstream(start)
            else:
                task.set_upstream(tasks[i - (parallel + 1)])
            i = i + 1
            tasks.append(task)

Я знаю, если я определю функцию внутри того же тега, она будет вызываться каждый раз, когда анализируется. не мое дело, так что не так? Спасибо

1 Ответ

0 голосов
/ 02 февраля 2020

Вы вызываете функцию send_to_slack внутри вашего файла DAG, это означает, что она будет запускаться каждый раз, когда планировщик оценивает ваш DAG (каждые несколько минут).

Вы должны либо:

  • Использовать оператор slack , поставляемый с Airflow, поставить его ниже по потоку от вашего OracleToSnowflakeOperator и обращаться с ним как с любым другим оператором
  • Отредактируйте ваш OracleToSnowflakeOperator, который, как я полагаю, является нестандартным, и поместите в него логику c для вызова Slack (используйте слабину)

По сути, вы должны поставить Инкапсулируйте вызов Slack внутри пользовательского оператора или используйте предоставленный стандартный оператор Slack, не помещайте его в определение DAG.

...