Как вызвать задачу Airflow, только когда новый раздел / данные доступны в таблице AWS athena с помощью DAG в python? - PullRequest
9 голосов
/ 16 апреля 2020

У меня есть сценарий, подобный приведенному ниже:

  1. Запускать Task 1 и Task 2 только тогда, когда для них доступны новые данные в исходной таблице (Афина). Триггер для Задачи1 и Задачи2 должен произойти, когда новый раздел данных за день.
  2. Триггер Task 3 только при завершении Task 1 и Task 2
  3. Триггер Task 4 только завершение Task 3

enter image description here

Мой код

from airflow import DAG

from airflow.contrib.sensors.aws_glue_catalog_partition_sensor import AwsGlueCatalogPartitionSensor
from datetime import datetime, timedelta

from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task1_partition_exists',
    database_name='DB',
    table_name='Table1',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

Athena_Trigger_for_Task2 = AwsGlueCatalogPartitionSensor(
    task_id='athena_wait_for_Task2_partition_exists',
    database_name='DB',
    table_name='Table2',
    expression='load_date={{ ds_nodash }}',
    timeout=60,
    dag=dag)

execute_Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task1.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task2 = PostgresOperator(
    task_id='Task2',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task2.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)



execute_Task3 = PostgresOperator(
    task_id='Task3',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task3.sql",
    params={'limit': '50'},
    trigger_rule='all_success',
    dag=dag
)

execute_Task4 = PostgresOperator(
    task_id='Task4',
    postgres_conn_id='REDSHIFT_CONN',
    sql="/sql/flow/Task4",
    params={'limit': '50'},
    dag=dag
)



execute_Task1.set_upstream(Athena_Trigger_for_Task1)
execute_Task2.set_upstream(Athena_Trigger_for_Task2)

execute_Task3.set_upstream(execute_Task1)
execute_Task3.set_upstream(execute_Task2)

execute_Task4.set_upstream(execute_Task3)

Что является оптимальным способ достижения этого?

1 Ответ

1 голос
/ 22 апреля 2020

Я полагаю, что ваш вопрос решает две основные проблемы:

  1. забывает настраивать schedule_interval явным образом, поэтому @daily настраивает то, чего вы не ожидаете.
  2. Как правильно запустить и повторить выполнение dag, когда вы зависите от внешнего события, чтобы завершить выполнение

краткий ответ: явно установите ваш schedule_interval с заданием cron форматируйте и используйте операторы датчиков, чтобы время от времени проверять

default_args={
        'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
     ....
    poke_time= 60*5 #<---- set a poke_time in seconds
    dag=dag)

, где startime - время начала вашей ежедневной задачи, endtime - последнее время дня. Вы должны проверить, было ли событие выполнено, прежде чем пометить его как неудачное, и poke_time - это интервал, который sensor_operator проверит, произошло ли событие.

Как явным образом обращаться к заданию cron всякий раз, когда вы устанавливаете dag на @daily, как вы это делали:

dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')

из документы , вы можете видеть, что на самом деле делаете: @daily - Run once a day at midnight

Что теперь имеет смысл, почему вы получаете ошибку тайм-аута, и происходит сбой через 5 минут, потому что вы установили 'retries': 1 и 'retry_delay': timedelta(minutes=5). Таким образом, он пытается запустить даг в полночь, он терпит неудачу. повторяет попытку через 5 минут и снова завершается ошибкой, поэтому он помечается как неудачный.

Таким образом, в основном @daily run устанавливает неявное задание cron:

@daily -> Run once a day at midnight -> 0 0 * * *

Формат задания cron имеет формат ниже, и вы устанавливаете значение * всякий раз, когда вы хотите сказать «все».

Minute Hour Day_of_Month Month Day_of_Week

Таким образом, @daily в основном говорит, что запускайте это каждые: минуты 0 часа 0 из all days_of_month всех месяцев всех days_of_week

Таким образом, ваше дело будет запускаться каждую минуту: 0 часа 10 всех days_of_month всех_months всех days_of_week. Это переводит в формате задания cron:

0 10 * * *

Как правильно запустить и повторить выполнение dag, когда вы зависите от внешнего события, чтобы завершить выполнение

  1. Вы можете вызвать снижение потока воздуха от внешнего события, используя команду airflow trigger_dag. это было бы возможно, если бы вы могли запускать лямбда-скрипт / python скрипт для нацеливания на ваш экземпляр воздушного потока.

  2. Если вы не можете вызвать dag извне, используйте датчик Оператор, как OP, установите для него значение poke_time и установите достаточно большое количество повторных попыток.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...