Airflow S3KeySensor - Как заставить его продолжать работать - PullRequest
0 голосов
/ 29 мая 2018

С помощью этого поста Stackoverflow Я только что создал программу (ту, которая показана в посте), в которой, когда файл помещается в корзину S3, запускается задача в одном из моих работающих DAG изатем я выполняю некоторую работу, используя BashOperator.После того, как это сделано, хотя группа обеспечения доступности баз данных больше не находится в рабочем состоянии, а вместо этого переходит в состояние успеха, и если я хочу, чтобы он взял другой файл, мне нужно очистить все «Прошлое», «Будущее», «Восходящий поток», «Нисходящая деятельность.Я хотел бы сделать эту программу так, чтобы она всегда работала и каждый раз, когда новый файл помещался в корзину S3, программа запускала задачи.

Могу ли я продолжать использовать S3KeySenor для этого или мне нужнонайти способ настроить внешний триггер для запуска моего DAG?На данный момент мой S3KeySensor довольно бессмыслен, если он когда-либо будет запускаться только один раз.

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 5, 29),
    'email': ['something@here.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')

# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
    task_id='create_emr_cluster_1',
    bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
    retries=1,
    dag=dag)

t1 = BashOperator(
    task_id='success_log',
    bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
    dag=dag)

sensor = S3KeySensor(
    task_id='new_s3_file_in_foobar-bucket',
    bucket_key='*',
    wildcard_match=True,
    bucket_name='foobar-bucket',
    s3_conn_id='s3://foobar-bucket',
    timeout=18*60*60,
    poke_interval=120,
    dag=dag)

t1.set_upstream(sensor)
t2.set_upstream(t1)

Мне интересно, если это невозможно, потому что тогда это не будет Направленный ациклический граф, а скорее это будетиметь цикл, который повторял датчик -> t1 -> t2 -> датчик -> t1 -> t2 -> датчик -> ... повторять .

Обновление:

Мой вариант использования довольно прост, каждый раз, когда новый файл помещается в назначенное ведро AWS S3, я хочу, чтобы мой DAG запускался и запускал процесс выполнения различных задач.Задачи будут выполнять такие вещи, как создание нового кластера AWS EMR, извлечение файлов из корзины AWS S3, выполнение некоторых операций AWS EMR, а затем завершение работы кластера AWS EMR.Оттуда группа обеспечения доступности баз данных вернется в состояние ожидания, где она будет ожидать поступления новых файлов в корзину AWS S3, а затем повторять процесс бесконечно.

1 Ответ

0 голосов
/ 29 мая 2018

В Airflow нет концепции, которая сопоставлялась бы с постоянно работающей группой обеспечения доступности баз данных.Вы можете запускать DAG очень часто, например, каждые 1–5 минут, если это подходит для вашего варианта использования.

Главное здесь заключается в том, что S3KeySensor проверяет, пока не обнаружит, что первый файл существует в подстановочном пути ключа (или тайм-аут), то он запускается.Но когда появится второй, третий или четвертый файл, датчик S3 уже завершит работу для этого прогона DAG.Запланированный запуск не запланирован до следующего запуска DAG.(Идея зацикливания, которую вы описали, примерно эквивалентна тому, что делает планировщик при создании прогонов DAG, но не всегда.)

Внешний триггер определенно звучит как лучший подход для вашего варианта использования, независимо от того, происходит ли этот триггер черезКоманда trigger_dag CLI воздушного потока ($ airflow trigger_dag ...):

https://github.com/apache/incubator-airflow/blob/972086aeba4616843005b25210ba3b2596963d57/airflow/bin/cli.py#L206-L222

Или через API REST:

https://github.com/apache/incubator-airflow/blob/5de22d7fa0d8bc6b9267ea13579b5ac5f62c8bb5/airflow/www/api/experimental/endpoints.py#L41-L89

Обернитесь и вызовите функцию trigger_dag в общем (экспериментальном) API:

https://github.com/apache/incubator-airflow/blob/089c996fbd9ecb0014dbefedff232e8699ce6283/airflow/api/common/experimental/trigger_dag.py#L28-L67

Например, вы можете настроить лямбда-функцию AWS, которая вызывается, когдафайл попадает на S3, который запускает триггерный вызов DAG.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...