У меня есть группа DAG с двумя задачами: Task A
и Task B
. У меня возникла проблема при написании условия DAG
Мои условия следующие:
1) Задача A должна быть запущена, как только новые данные появятся в таблице (AWS AThena), и эта проверка должно быть сделано после 9 утра CET Daily. 2) При успешном выполнении задачи A следует запустить TAsk B.
Мой код
from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('RS_Input_Cleansing', default_args=default_args, schedule_interval='@daily')
wait_for_data_partition = AwsGlueCatalogPartitionSensor(
task_id='Task A',
database_name='region',
table_name='table',
expression='load_date={{ ds_nodash }}',
timeout=60,
dag=dag)
execute_sql = PostgresOperator(
task_id='Task B',
postgres_conn_id='DB_CONN',
sql="/sql/flow/execute.sql",
params={'limit': '50'},
dag=dag
execute_sql.set_upstream(wait_for_data_partition)
Как это можно сделать?