Вы можете использовать TimeSensor для задержки выполнения задач в группе обеспечения доступности баз данных.Я не думаю, что вы можете изменить фактический execution_date
, если вы не можете описать поведение как cron.
Если вы хотите, чтобы эта задержка применялась только к подмножеству запланированных запусков DAG, вы можете использовать BranchPythonOperator , чтобы сначала проверить, является ли execution_date
одним из тех дней, когда вы хотите отставание.Если это так, то возьмите ветку с датчиком.В противном случае двигаться без него.
В качестве альтернативы, особенно если вы планируете иметь такое поведение более чем в одной группе обеспечения доступности баз данных, вы можете написать модифицированную версию датчика.Это может выглядеть примерно так:
def poke(self, context):
if should_delay(context['execution_date']):
self.log.info('Checking if the time (%s) has come', self.target_time)
return timezone.utcnow().time() > self.target_time
else:
self.log.info('Not one of those days, just run')
return True
Вы можете сослаться на код для существующего датчика времени в https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/sensors/time_sensor.py#L38-L40.