Мне нужно реализовать задачу ожидания в Airflow. Время ожидания должно быть около пары часов.
Во-первых, TimeDeltaSensor просто не работает.
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
task_id="sleep_for_11_min",
delta=timedelta(minutes=SLEEP_MINUTES_1ST),
)
Для ежедневного расписания, например:
schedule_interval='30 06 * * *'
Просто ждет до следующего графика:
[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come
Это до боли очевидно в коде: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43
(не говоря уже об известной ошибке при использовании расписания: None или @once)
Следующая попытка была с TimeSensor следующим образом:
SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeSensor(
task_id="sleep_for_11_min",
provide_context=True,
target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
trigger_rule=TriggerRule.NONE_FAILED
)
И это на самом деле хорошо работало, но в режиме poke требуется один рабочий на все время ожидания время. Я получил предложение использовать режим reschedule , но просто добавив:
mode='reschedule',
генерирует новое расписание при каждой проверке перепланирования и никогда не завершается так:
[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 15:38:51,306] {time_sensor.py:39} INFO - Checking if the time (14:49:51.079783) has come
[2020-01-15 15:38:51,331] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
...
[2020-01-15 15:41:00,587] {time_sensor.py:39} INFO - Checking if the time (14:52:00.202168) has come
[2020-01-15 15:41:00,614] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.....
(обратите внимание, что Airflow смешивает UT C и мой часовой пояс UTC + 1 в журнале здесь)
Следующая попытка - сгенерировать target_time для TimeSensor относительно исполнительного_дата группы DAG. Но несколько попыток не увенчались успехом, например:
task_target_time = '{{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}'
sleep_task_1 = TimeSensor(
task_id=task_id="sleep_for_11_min",
provide_context=True,
# target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
# target_time = task_target_time,
# target_time=datetime.strptime('{{ execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST) }}','%Y-%m-%dT%H:%M:%S'),
# target_time='{{ execution_date }}'+ timedelta(minutes=SLEEP_MINUTES_1ST),
target_time = ('{{ task_instance.execution_date }}' + timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
poke_interval=120,
mode='reschedule',
timeout=10*60*60,
trigger_rule=TriggerRule.NONE_FAILED
)
В закомментированных строках (target_time ....) вы можете увидеть только некоторые комбинации, которые я пробовал. Некоторые сразу потерпели неудачу при создании DAG, а некоторые выдают такую ошибку во время выполнения:
[2020-01-15 17:56:39,388] {time_sensor.py:39} INFO - Checking if the time ({{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}) has come
[2020-01-15 17:56:39,389] {taskinstance.py:1058} ERROR - '>' not supported between instances of 'datetime.time' and 'str'
Traceback (most recent call last):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
while not self.poke(context):
File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/time_sensor.py", line 40, in poke
return timezone.utcnow().time() > self.target_time
TypeError: '>' not supported between instances of 'datetime.time' and 'str'
[2020-01-15 17:56:39,390] {taskinstance.py:1089} INFO - Marking task as FAILED.
Мне кажется, я понимаю всю теорию - контекст задачи, включая execute_date, недоступен при создании оператора, только во время во время выполнения. Jinja возвращает объект Pendulum, который должен быть преобразован во время, но Jinja - это String, и я не получаю методы Pendulum во время создания.
Но ПОЧЕМУ так сложно создать простое:
sleep 1000
в Airflow.
(Airflow: v1.10.6, python 3.6.8)