Воздушный поток on_failure_callback & SlackAPIPostOperator - PullRequest
0 голосов
/ 05 июля 2018

Попытка публикации в Slack с помощью on_failure_callback. У меня есть эта процедура ниже, и мои аргументы dag по умолчанию имеют

'on_failure_callback' : notify_slack_failure

Когда моя задача не выполняется, она создает вывод в /tmp..но мое сообщение Slack не публикуется. Когда я проверю это через командную строку, он отправит сообщение в Slack. Есть мысли о том, что мне не хватает?

def notify_slack_failure(context):
   """
   Define the callback to post on Slack if a failure is detected in the Workflow
   :return: operator.execute
   """

   cmd = "echo '" + getSlackToken() +"' > /tmp/a.out"
   os.system("touch /tmp/b.out")
   os.system(cmd)
   text_message='333'
   #text_message=str(context['task_instance'])

   operator = SlackAPIPostOperator(
      task_id='failure',
      text=text_message,
      token=getSlackToken(),
      channel=SLACK_CHANNEL,
      username=SLACK_USER
   )
   os.system("touch /tmp/e1.out")
   return operator.execute(context=context)

Ответы [ 2 ]

0 голосов
/ 06 июля 2018

В командной строке я определил мои переменные http_proxy и не установил их для процессов-демонов.

0 голосов
/ 05 июля 2018

Джордж -Добро пожаловать в ТАК! Попробуйте запустить этот код. Посмотрите, может ли Airflow публиковать сообщения в Slack, и записывает ли код записи в файл tmp.

def notify_slack_failure(contextDictionary, **kwargs):  
   """
   Define the callback to post on Slack if a failure is detected in the Workflow
   :return: operator.execute
   """
   text_message='333'
   #text_message=str(context['task_instance'])

   operator= SlackAPIPostOperator(
      task_id='failure',
      token=getSlackToken(),
      text=text_message,
      channel=SLACK_CHANNEL,
      username=SLACK_USER,)
      return operator.execute

Взято из ТАК ответ Slack Airflow

...