используя плагин airflow для импорта внешней функции python - PullRequest
1 голос
/ 11 марта 2019

Вкл. on_failure_callback Я вызываю sns метод для отправки уведомления, который работает нормально.
Я хочу объявить функцию sns отдельной функцией, используя плагины воздушного потока.

import boto3
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

# t1, t2, t3 and t4 are examples of tasks created using operators

def sns(state):
  client = boto3.client('sns',aws_access_key_id='abcd',aws_secret_access_key='abcd',region_name='us-east-1')
  response = client.publish(
    TargetArn='Topicarn',
    Message='Test')

t3 = BashOperator(
    task_id='task_3',
    bash_command='fail',
    on_failure_callback=sns,
    dag=dag)

Спасибо
Раджив

Ответы [ 2 ]

1 голос
/ 11 марта 2019

(отвечая на реальные вопросы, опубликованные в комментариях, переместите их на исходный вопрос)

  1. Поскольку это будет в основном оператор уведомления, вы должны наследовать от BaseOperator.
  2. Папка плагина обычно находится в корне проекта, рядом с папкой dag.
  3. Что касается самого плагина, то после наследования от AirflowPlugin он будет доступен для импорта как airflow.operators.my_operator.Вы можете использовать его как любой другой оператор.
0 голосов
/ 12 марта 2019

Ниже код работал для меня:

код плагина:

import boto3
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults

class snsoperator(BaseOperator):
  @apply_defaults
  def __init__(self, *args, **kwargs):

   super(snsoperator, self).__init__(*args, **kwargs)

  def __call__(self, *args, **kwargs):
    print("calling execute")
    client = boto3.client('sns', aws_access_key_id='abc',
                              aws_secret_access_key='abc', region_name='us-east-1')
    response = client.publish(
            TargetArn='abc',
            Message='Test')

class snsplugin(AirflowPlugin):
  name = "snsplugin"
  operators = [snsoperator]

код DAG:

import boto3
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.operators.snsplugin import snsoperator

t3 = BashOperator(
    task_id='task_3',
    bash_command='fail',
    on_failure_callback=snsoperator(task_id='sns'),
    dag=dag)
...