Можно ли добавить задержку к расписанию в потоке воздуха? - PullRequest
0 голосов
/ 30 ноября 2018

У меня есть конвейер, который я хочу запускать каждый день, но я бы хотел, чтобы дата выполнения была отложенной.То есть в день X я хочу, чтобы дата исполнения была X-3.Возможно ли что-то подобное?

Ответы [ 2 ]

0 голосов
/ 04 декабря 2018

Похоже, вы используете execution_date в качестве переменной в логике конвейера.Например, для обработки данных, которые на 3 дня старше, чем execution_date.Таким образом, вместо execution_date для задержки на 3 дня вы можете вычесть задержку из execution_date и использовать результат в своей конвейерной логике.Воздушный поток предоставляет несколько способов сделать это:

  1. Шаблоны: {{ execution_date - macros.timedelta(days=3) }}.Так, например, параметр bash_command в BashOperator может быть bash_command='echo Processing date: {{ execution_date - macros.timedelta(days=3) }} '
  2. Вызываемый Python PythonOperator: Определить вызываемый объект как def func(execution_date, **kwargs): ... и установить параметр PythonOperator provide_context=True,Параметр execution_date func() будет установлен на текущую дату выполнения (объект datetime) при вызове.Итак, внутри func() вы можете сделать processing_date = execution_date - timedelta(days=3).
  3. Параметр context датчиков : методы poke() и execute() любого датчика имеют contextпараметр, который является параметром для всех макросов, включая execution_date.Таким образом, в этих методах вы можете выполнить processing_date = context['execution_date'] - timedelta(days=3).

Принудительно указывать дату выполнения с задержкой просто неправильно.Поскольку, согласно логике Airflow, дата выполнения работающей в данный момент группы доступности базы данных обычно может иметь задержку, только если она догоняет (bakcfilling).

0 голосов
/ 01 декабря 2018

Вы можете использовать TimeSensor для задержки выполнения задач в группе обеспечения доступности баз данных.Я не думаю, что вы можете изменить фактический execution_date, если вы не можете описать поведение как cron.

Если вы хотите, чтобы эта задержка применялась только к подмножеству запланированных запусков DAG, вы можете использовать BranchPythonOperator , чтобы сначала проверить, является ли execution_date одним из тех дней, когда вы хотите отставание.Если это так, то возьмите ветку с датчиком.В противном случае двигаться без него.

В качестве альтернативы, особенно если вы планируете иметь такое поведение более чем в одной группе обеспечения доступности баз данных, вы можете написать модифицированную версию датчика.Это может выглядеть примерно так:

def poke(self, context):
    if should_delay(context['execution_date']):
        self.log.info('Checking if the time (%s) has come', self.target_time)
        return timezone.utcnow().time() > self.target_time
    else:
        self.log.info('Not one of those days, just run')
        return True

Вы можете сослаться на код для существующего датчика времени в https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/sensors/time_sensor.py#L38-L40.

...