С помощью этого поста Stackoverflow Я только что создал программу (ту, которая показана в посте), в которой, когда файл помещается в корзину S3, запускается задача в одном из моих работающих DAG изатем я выполняю некоторую работу, используя BashOperator.После того, как это сделано, хотя группа обеспечения доступности баз данных больше не находится в рабочем состоянии, а вместо этого переходит в состояние успеха, и если я хочу, чтобы он взял другой файл, мне нужно очистить все «Прошлое», «Будущее», «Восходящий поток», «Нисходящая деятельность.Я хотел бы сделать эту программу так, чтобы она всегда работала и каждый раз, когда новый файл помещался в корзину S3, программа запускала задачи.
Могу ли я продолжать использовать S3KeySenor для этого или мне нужнонайти способ настроить внешний триггер для запуска моего DAG?На данный момент мой S3KeySensor довольно бессмыслен, если он когда-либо будет запускаться только один раз.
from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_triggered_emr_cluster_dag', default_args=default_args, schedule_interval= '@once')
# This Activity runs a Python script that creates an AWS EMR cluster and then does EMR activity on the EMR cluster.
t2 = BashOperator(
task_id='create_emr_cluster_1',
bash_command='python /home/ec2-user/aws-python-sample/Create_EMR_Then_Do_EMR_Activities.py',
retries=1,
dag=dag)
t1 = BashOperator(
task_id='success_log',
bash_command='echo "Dag ran successfully" >> /home/ec2-user/s3_triggered_dag.txt',
dag=dag)
sensor = S3KeySensor(
task_id='new_s3_file_in_foobar-bucket',
bucket_key='*',
wildcard_match=True,
bucket_name='foobar-bucket',
s3_conn_id='s3://foobar-bucket',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)
t2.set_upstream(t1)
Мне интересно, если это невозможно, потому что тогда это не будет Направленный ациклический граф, а скорее это будетиметь цикл, который повторял датчик -> t1 -> t2 -> датчик -> t1 -> t2 -> датчик -> ... повторять .
Обновление:
Мой вариант использования довольно прост, каждый раз, когда новый файл помещается в назначенное ведро AWS S3, я хочу, чтобы мой DAG запускался и запускал процесс выполнения различных задач.Задачи будут выполнять такие вещи, как создание нового кластера AWS EMR, извлечение файлов из корзины AWS S3, выполнение некоторых операций AWS EMR, а затем завершение работы кластера AWS EMR.Оттуда группа обеспечения доступности баз данных вернется в состояние ожидания, где она будет ожидать поступления новых файлов в корзину AWS S3, а затем повторять процесс бесконечно.