Как активировать DAG Airflow для запуска триггера задачи в 9:00 CET в зависимости от условия? - PullRequest
3 голосов
/ 17 апреля 2020

У меня есть группа DAG с двумя задачами: Task A и Task B. У меня возникла проблема при написании условия DAG

Мои условия следующие:

1) Задача A должна быть запущена, как только новые данные появятся в таблице (AWS AThena), и эта проверка должно быть сделано после 9 утра CET Daily. 2) При успешном выполнении задачи A следует запустить TAsk B.

Мой код

from airflow import DAG
from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('RS_Input_Cleansing', default_args=default_args, schedule_interval='@daily')

wait_for_data_partition = AwsGlueCatalogPartitionSensor(
    task_id='Task A',
    database_name='region',
    table_name='table',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_sql = PostgresOperator(
    task_id='Task B',
    postgres_conn_id='DB_CONN',
    sql="/sql/flow/execute.sql",
    params={'limit': '50'},
    dag=dag

execute_sql.set_upstream(wait_for_data_partition)

Как это можно сделать?

...