Как интегрировать Apache Airflow со слабиной? - PullRequest
0 голосов
/ 28 августа 2018

Может кто-нибудь дать мне пошаговое руководство по подключению Apache Airflow к рабочему пространству Slack. Я создал webhook для своего канала и что мне с ним делать дальше?

С уважением

Ответы [ 3 ]

0 голосов
/ 04 декабря 2018

Попробуйте новый SlackWebhookOperator , который есть в версии Airflow> = 1.10.0

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg="Hi Wssup?"

slack_test =  SlackWebhookOperator(
    task_id='slack_test',
    http_conn_id='slack_connection',
    webhook_token='/1234/abcd',
    message=slack_msg,
    channel='#airflow_updates',
    username='airflow_'+os.environ['ENVIRONMENT'],
    icon_emoji=None,
    link_names=False,
    dag=dag)

Примечание. Убедитесь, что в соединениях Airflow добавлено slack_connection как

.
host=https://hooks.slack.com/services/
0 голосов
/ 11 декабря 2018

Полный пример с использованием SlackWebhookOperator, как в @kaxil:

def slack_failed_task(task_name):
  failed_alert = SlackWebhookOperator(
    task_id='slack_failed_alert',
    http_conn_id='slack_connection',
    webhook_token=Variable.get("slackWebhookToken", default_var=""),
    message='@here DAG Failed {}'.format(task_name),
    channel='#epm-marketing-dev',
    username='Airflow_{}'.format(ENVIRONMENT_SUFFIX),
    icon_emoji=':red_circle:',
    link_names=True,
  )
  return failed_alert.execute

task_with_failed_slack_alerts = PythonOperator(
  task_id='task0',
  python_callable=<file to execute>,
  on_failure_callback=slack_failed_task,
  provide_context=True,
  dag=dag)

Как @Deep Nirmal Примечание. Убедитесь, что в соединениях Airflow добавлено slack_connection как

host=https://hooks.slack.com/services/
0 голосов
/ 28 августа 2018
SlackAPIPostOperator(
      task_id='failure',
      token='YOUR_TOKEN',
      text=text_message,
      channel=SLACK_CHANNEL,
      username=SLACK_USER)

Вышеуказанный простейший способ использовать Airflow для отправки сообщений в Slack.

Однако, если вы хотите настроить Airflow для отправки сообщений в Slack при сбоях задач, создайте функцию и добавьте on_failure_callback к вашим задачам с именем созданной функции slack. Пример ниже:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
    task_id='task0',
    python_callable=<file to execute>,
    on_failure_callback=slack_failed_task,
    provide_context=True,
    dag=dag)

Использование SlackWebHook (работает только для воздушного потока> = 1.10.0): Если вы хотите использовать SlackWebHook, используйте SlackWebhookOperator аналогичным образом:

https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/slack_webhook_operator.py#L25

...