Ниже код работал для меня:
код плагина:
import boto3
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
class snsoperator(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
super(snsoperator, self).__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
print("calling execute")
client = boto3.client('sns', aws_access_key_id='abc',
aws_secret_access_key='abc', region_name='us-east-1')
response = client.publish(
TargetArn='abc',
Message='Test')
class snsplugin(AirflowPlugin):
name = "snsplugin"
operators = [snsoperator]
код DAG:
import boto3
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.operators.snsplugin import snsoperator
t3 = BashOperator(
task_id='task_3',
bash_command='fail',
on_failure_callback=snsoperator(task_id='sns'),
dag=dag)