из импорта воздушного потока DAG из airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor из даты и времени, импорта, даты и времени, timedelta из airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 5, 29),
'email': ['something@here.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('s3_dag_airflow', default_args=default_args, schedule_interval= '@once')
t1 = BashOperator(
task_id='success_log',
bash_command='echo "Dag ran successfully" >> ./s3_triggered_dag.txt',
dag=dag)
sensor = S3KeySensor(
task_id='new_s3_file_in_foobar-bucket',
bucket_key='*',
wildcard_match=True,
bucket_name='*',
s3_conn_id='aws_default',
timeout=18*60*60,
poke_interval=120,
dag=dag)
t1.set_upstream(sensor)