Воздушный поток SSHOperator, как определить on_failure_callback - PullRequest
0 голосов
/ 16 января 2020

Привет! Я работаю с оповещениями Slack API о потоке воздуха, который при попытке работает для SparkSubmitOperator, но, похоже, не работает с SSHOperator. Как мне вызвать on_failure_callback функциональность от SSHOperator

SLACK_CONN_ID = 'slack'

def task_fail_slack_alert(context):
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
            :red_circle: Task Failed. 
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Log Url*: {log_url} 
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            ti=context.get('task_instance'),
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
        )
    failed_alert = SlackWebhookOperator(
        task_id='slack_test',
        http_conn_id='slack',
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username='airflow')
    return failed_alert.execute(context=context)

player_match = SSHOperator(
    ssh_hook=player_pricing_hook,
    task_id='player_match_summary',
    command='python3 player_match_summary.py',
    on_failure_callback=task_fail_slack_alert,
    dag=dag)
...