Привет! Я работаю с оповещениями Slack API о потоке воздуха, который при попытке работает для SparkSubmitOperator
, но, похоже, не работает с SSHOperator
. Как мне вызвать on_failure_callback
функциональность от SSHOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)
player_match = SSHOperator(
ssh_hook=player_pricing_hook,
task_id='player_match_summary',
command='python3 player_match_summary.py',
on_failure_callback=task_fail_slack_alert,
dag=dag)