поток воздуха on_failure_call_back теперь работает непрерывно - PullRequest
0 голосов
/ 30 ноября 2018

Я хочу сделать предупреждение о слабом состоянии (статус ошибки и статус успеха)

Теперь Dag работает, а когда статус равен sucssess, то работает сигнал слабого хода!но теперь on_failure_callback - статус ошибки работает непрерывно (один раз в 1 минуту)

Обратите внимание, что он продолжает отказывать.Но, это не работает, и я думаю, что это не реальный статус.

Как я могу изменить это сделать? ... Я хочу знать уведомление о реальном сбое

теперь нашАргумент task_default выглядит следующим образом.

dt = datetime.now(tz=tz.tzlocal())
task_default_args = {
    'owner': 'owner',
    'retries': 2,
    'retry_delay': timedelta(minutes=1),
    'start_date': datetime(2018, 11, 10),
    #'depends_on_past': False,
    'email': ['mail'],
    'email_on_failure': True,
    'email_on_retry': False,
    'on_failure_callback': send_slack(
        senderRole='airflow',
        receiverSubscribe='bot',
        level='info',
        text='= fail' + str(dt),
        X_CAG_AUTH='AG_CONSUMER_TOKEN access-key=500000000000',
    ),
    'execution_timeout': timedelta(minutes=30)
}

-- > Dag Contents like this 


start = DummyOperator(
    task_id='start',
    dag=dag)

tmp_slack_test_dag = PostgresOperator(pool=redshift_pool,
                      task_id='tmp_slack_test_sql',
                      postgres_conn_id=redshift_conn_id,
                      sql="""sql/tmp_.sql""",
                      parameters=None,
                      autocommit=True,
                      dag=dag
                    )

success_dummy = DummyOperator(
    task_id='success_dummy',
    dag=dag,
    trigger_rule=TriggerRule.ALL_SUCCESS
)

alert_success_task = PythonOperator(
    task_id='alert_success',
    python_callable=lambda: send_slack(
        senderRole='airflow',
        receiverSubscribe='bot',
        level='info',
        text='success'+str(dt),
        X_CAG_AUTH='AG_CONSUMER_TOKEN access-key=500000000000'
    ),
    #depends_on_past=True,
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

start >> tmp_slack_test_dag >> success_dummy >> alert_success_task >> end

1 Ответ

0 голосов
/ 30 ноября 2018

Это потому, что вам нужно передать функцию к on_failure_callback, а не к выходу функции.

Измените ее на следующую, т.е. разделите вашу функцию оповещения и просто передайте имя функции на on_failure_callback

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
    task_id='task0',
    python_callable=<file to execute>,
    on_failure_callback=slack_failed_task,
    provide_context=True,
    dag=dag)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...