У меня DAG работает каждые 30 минут.
Скажите, что это DAG (для простоты используются фиктивные операторы):
dag = DAG(
dag_id='My_dag',
default_args=args,
schedule_interval=timedelta(minutes=30),
max_active_runs=1,
catchup=False,
)
start = DummyOperator(task_id='start_task', dag=dag)
to_do = DummyOperator(task_id='to_do_task ', dag=dag)
end = DummyOperator(task_id='end_task ', dag=dag)
start >> to_do >> end
Теперь, один раз в день я хочу добавить другого оператора в рабочий процесс, который будет выполняться только при первом запуске в этот день .
Скажите, что это:
once = DummyOperator(task_id='once_task ', dag=dag)
start >> once
означает, что once
должен выполняться один раз каждые 24 часа, а остальные должны быть пропущены.
Я не могу сделать это с PythonBranchOperator
, как я не могу что-то вроде:
if execution_date == midnigt
потому что я не знаю, когда произойдет первое казнь. Это может быть 00:01, и это может быть 00:17 и т. Д.
Есть ли способ проверить, является ли это первым запуском за дату выполнения? Я звучу как TimeSensor
, но я не мог найти, как это сделать с документами. Можно ли совать тот же DAG?