Задержка сна в Airflow backfill Dag - PullRequest
0 голосов
/ 18 июня 2020

Для заданий обратной засыпки я пробовал датчик timedelta, но, похоже, он проверяет время выполнения + 60 минут вместо текущего времени + 60 минут. Есть ли способ вызвать текущее время + 60-минутную задержку для заданий обратной засыпки?

По сути, эта задержка не позволит мне достичь предела скорости API. В настоящее время используется следующая задача для задержки, которая, похоже, не работает.

    task_id='wait',
    delta=timedelta(minutes=60),
    dag=dag
)

Ответы [ 2 ]

1 голос
/ 18 июня 2020

Вам понадобится собственный пользовательский оператор или датчик и используйте атрибут start_date экземпляра задачи. Использование start_date - единственное изменение, которое вам нужно сделать из TimeDeltaSensor в библиотеке.

Если вы скопируете исходный код TimeDeltaSensor, вы можете переопределить метод poke примерно таким:

        # ...
        target_dttm = context["task_instance"].start_date
        target_dttm += self.delta
        return timezone.utcnow() > target_dttm

Добавлено

Читая ваш вопрос еще раз, я вижу, что вам может просто понадобиться оператор со сном при его выполнении. Или вызываемый python, в котором есть шаблон сна, и pu sh, который через PythonOperator ... что-то вроде

def hit_api():
    pass

def hit_and_back_off:
    while True:
        try:
            hit_api()
        except APILimitError:
            sleep(<SOME SLEEP>)
        except e:
            raise
0 голосов
/ 18 июня 2020

Самое простое решение, которое я нашел, - это иметь еще одну группу DAG, которая запускает запланированные засыпки

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...