Я полагаю, что ваш вопрос решает две основные проблемы:
- забывает настраивать
schedule_interval
явным образом, поэтому @daily настраивает то, чего вы не ожидаете. - Как правильно запустить и повторить выполнение dag, когда вы зависите от внешнего события, чтобы завершить выполнение
краткий ответ: явно установите ваш schedule_interval с заданием cron форматируйте и используйте операторы датчиков, чтобы время от времени проверять
default_args={
'retries': (endtime - starttime)*60/poke_time
}
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='0 10 * * *')
Athena_Trigger_for_Task1 = AwsGlueCatalogPartitionSensor(
....
poke_time= 60*5 #<---- set a poke_time in seconds
dag=dag)
, где startime
- время начала вашей ежедневной задачи, endtime
- последнее время дня. Вы должны проверить, было ли событие выполнено, прежде чем пометить его как неудачное, и poke_time
- это интервал, который sensor_operator
проверит, произошло ли событие.
Как явным образом обращаться к заданию cron всякий раз, когда вы устанавливаете dag на @daily
, как вы это делали:
dag = DAG('Trigger_Job', default_args=default_args, schedule_interval='@daily')
из документы , вы можете видеть, что на самом деле делаете: @daily - Run once a day at midnight
Что теперь имеет смысл, почему вы получаете ошибку тайм-аута, и происходит сбой через 5 минут, потому что вы установили 'retries': 1
и 'retry_delay': timedelta(minutes=5)
. Таким образом, он пытается запустить даг в полночь, он терпит неудачу. повторяет попытку через 5 минут и снова завершается ошибкой, поэтому он помечается как неудачный.
Таким образом, в основном @daily run устанавливает неявное задание cron:
@daily -> Run once a day at midnight -> 0 0 * * *
Формат задания cron имеет формат ниже, и вы устанавливаете значение *
всякий раз, когда вы хотите сказать «все».
Minute Hour Day_of_Month Month Day_of_Week
Таким образом, @daily в основном говорит, что запускайте это каждые: минуты 0 часа 0 из all days_of_month всех месяцев всех days_of_week
Таким образом, ваше дело будет запускаться каждую минуту: 0 часа 10 всех days_of_month всех_months всех days_of_week. Это переводит в формате задания cron:
0 10 * * *
Как правильно запустить и повторить выполнение dag, когда вы зависите от внешнего события, чтобы завершить выполнение
Вы можете вызвать снижение потока воздуха от внешнего события, используя команду airflow trigger_dag
. это было бы возможно, если бы вы могли запускать лямбда-скрипт / python скрипт для нацеливания на ваш экземпляр воздушного потока.
Если вы не можете вызвать dag извне, используйте датчик Оператор, как OP, установите для него значение poke_time и установите достаточно большое количество повторных попыток.