Воздушный поток: выполнение задачи ожидания (сна) - эффективно - PullRequest
1 голос
/ 15 января 2020

Мне нужно реализовать задачу ожидания в Airflow. Время ожидания должно быть около пары часов.

Во-первых, TimeDeltaSensor просто не работает.

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
            task_id="sleep_for_11_min",
            delta=timedelta(minutes=SLEEP_MINUTES_1ST),                    
    )

Для ежедневного расписания, например:

schedule_interval='30 06 * * *'

Просто ждет до следующего графика:

[2020-01-15 18:10:21,800] {time_delta_sensor.py:45} INFO - Checking if the time (2020-01-16 06:41:00+00:00) has come

Это до боли очевидно в коде: https://github.com/apache/airflow/blob/master/airflow/sensors/time_delta_sensor.py#L43

(не говоря уже об известной ошибке при использовании расписания: None или @once)

Следующая попытка была с TimeSensor следующим образом:

 SLEEP_MINUTES_1ST = 11
 sleep_task_1 = TimeSensor(
           task_id="sleep_for_11_min",
           provide_context=True,
           target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
           trigger_rule=TriggerRule.NONE_FAILED    
        )

И это на самом деле хорошо работало, но в режиме poke требуется один рабочий на все время ожидания время. Я получил предложение использовать режим reschedule , но просто добавив:

mode='reschedule',

генерирует новое расписание при каждой проверке перепланирования и никогда не завершается так:

[2020-01-15 15:36:42,818] {time_sensor.py:39} INFO - Checking if the time (14:47:42.707565) has come
[2020-01-15 15:36:42,981] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
....
[2020-01-15 15:38:51,306] {time_sensor.py:39} INFO - Checking if the time (14:49:51.079783) has come
[2020-01-15 15:38:51,331] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
...
[2020-01-15 15:41:00,587] {time_sensor.py:39} INFO - Checking if the time (14:52:00.202168) has come
[2020-01-15 15:41:00,614] {taskinstance.py:1054} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
.....

(обратите внимание, что Airflow смешивает UT C и мой часовой пояс UTC + 1 в журнале здесь)

Следующая попытка - сгенерировать target_time для TimeSensor относительно исполнительного_дата группы DAG. Но несколько попыток не увенчались успехом, например:

task_target_time = '{{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}' 
sleep_task_1 = TimeSensor(
          task_id=task_id="sleep_for_11_min",
          provide_context=True,
#         target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
#         target_time = task_target_time,
#         target_time=datetime.strptime('{{ execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST) }}','%Y-%m-%dT%H:%M:%S'),                        
#         target_time='{{ execution_date }}'+ timedelta(minutes=SLEEP_MINUTES_1ST),
          target_time = ('{{ task_instance.execution_date }}' + timedelta(minutes=SLEEP_MINUTES_1ST)).time(),
              poke_interval=120,
              mode='reschedule',
              timeout=10*60*60,
              trigger_rule=TriggerRule.NONE_FAILED    
        )

В закомментированных строках (target_time ....) вы можете увидеть только некоторые комбинации, которые я пробовал. Некоторые сразу потерпели неудачу при создании DAG, а некоторые выдают такую ​​ошибку во время выполнения:

[2020-01-15 17:56:39,388] {time_sensor.py:39} INFO - Checking if the time ({{ macros.datetime.fromtimestamp((execution_date + macros.timedelta(minutes=SLEEP_MINUTES_1ST).timestamp()) }}) has come
[2020-01-15 17:56:39,389] {taskinstance.py:1058} ERROR - '>' not supported between instances of 'datetime.time' and 'str'
Traceback (most recent call last):
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
    while not self.poke(context):
  File "/data/airflow_ak/.direnv/lib/python3.6/site-packages/airflow/sensors/time_sensor.py", line 40, in poke
    return timezone.utcnow().time() > self.target_time
TypeError: '>' not supported between instances of 'datetime.time' and 'str'
[2020-01-15 17:56:39,390] {taskinstance.py:1089} INFO - Marking task as FAILED.

Мне кажется, я понимаю всю теорию - контекст задачи, включая execute_date, недоступен при создании оператора, только во время во время выполнения. Jinja возвращает объект Pendulum, который должен быть преобразован во время, но Jinja - это String, и я не получаю методы Pendulum во время создания.

Но ПОЧЕМУ так сложно создать простое:

sleep 1000

в Airflow.

(Airflow: v1.10.6, python 3.6.8)

Ответы [ 3 ]

1 голос
/ 16 января 2020

TimeSensor входит в перепланирование l oop, потому что target_time пересчитывается при каждой проверке ограничения на другое значение. Это приводит к тому, что ограничение никогда не будет выполнено.

    target_time=(timezone.utcnow()+timedelta(minutes=SLEEP_MINUTES_1ST)).time(),

При использовании TimeSensor таким образом вы должны установить для target_time значение времени, которое является последним временем, которое вы ожидаете, чтобы условие было выполнено доволен.

Я предлагаю использовать TimeDeltaSensor в режиме reschedule. Можно дождаться запланированного задания, а затем перепланировать его, если оно выполнит проверку ограничения или иным образом выполнит ее.

SLEEP_MINUTES_1ST = 11
sleep_task_1 = TimeDeltaSensor(
    task_id="sleep_for_11_min",
    delta=timedelta(minutes=SLEEP_MINUTES_1ST),
    mode='reschedule'               
)

Вы также можете создать подкласс BaseSensorOperator, аналогичный TimeSensor, который выполняет Проверка живости, чтобы увидеть, была ли задача снята со сна. Например,

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
from airflow.models.taskreschedule import TaskReschedule
from airflow.utils.session import provide_session


XCOM_KEY='start_date'

class ReleaseProbe(BaseSensorOperator):
    """
    Waits until the time of job is released from sleep.
    :param sleep_duration: sleep duration of job before it runs 
    :type delta: datetime.timedelta
    """

    @apply_defaults
    def __init__(self, sleep_duration, *args, **kwargs):
        super(ReleaseProbe, self).__init__(*args, **kwargs)
        self.sleep_duration = sleep_duration

    def poke(self, context):
        self.log.info('Checking if task is released after (%s) sleep, execution date is:  %s', self.sleep_duration)

        ti = context['ti']

        start_date = ti.xcom_pull(key=XCOM_KEY, task_id=ti.task_id)
        if not start_date:
            ti.xcom_push(key=XCOM_KEY, value=timezone.now())
            return False

        return timezone.utcnow() - start_date > self.sleep_duration
0 голосов
/ 17 января 2020

Вот датчик воздушного потока, который «спит», так как я предполагаю, что TimeDeltaSensor должен находиться в спящем режиме.

Лучше всего использовать в режиме «перепланирования».

Он спит относительно текущего времени, которое является началом экземпляра задачи, например, оператор TimeSleepSensor, и по умолчанию он "высовывает" только один раз после периода продолжительности сна и имеет тайм-аут по умолчанию, который истекает через некоторое время после запроса sleep_duration на случай, если что-то произошло, что вызвало сбой действия poke.

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta

class TimeSleepSensor(BaseSensorOperator):
    """
    Waits for specified time interval relative to task instance start

    :param sleep_duration: time after which the job succeeds
    :type sleep_duration: datetime.timedelta
    """

    @apply_defaults
    def __init__(self, sleep_duration, *args, **kwargs):
        super(TimeSleepSensor, self).__init__(*args, **kwargs)
        self.sleep_duration = sleep_duration
        self.poke_interval = kwargs.get('poke_interval',int(sleep_duration.total_seconds()))
        self.timeout = kwargs.get('timeout',int(sleep_duration.total_seconds()) + 30)


    def poke(self, context):
        ti = context["ti"]

        sensor_task_start_date = ti.start_date          
        target_time = sensor_task_start_date + self.sleep_duration

        self.log.info("Checking if the target time ({} - check:{}) has come - time to go: {}, start: {}, initial sleep_duration: {}"
                    .format(target_time, (timezone.utcnow() > target_time), (target_time-timezone.utcnow()), sensor_task_start_date, self.sleep_duration)
        )

        return timezone.utcnow() > target_time

Использование просто:

    sleep_task = TimeSleepSensor(
                        task_id="sleep_task",
                        sleep_duration=timedelta(minutes=1800),  
                        mode='reschedule'
    )
0 голосов
/ 16 января 2020

Ну, это не совсем решение вашей проблемы, а скорее альтернативный (проверенный) способ.
Что вы можете сделать, это просто создать оператор bash и вызвать sleep. Я полагаю, что это займет только поток, как команда sleep на терминале.

from airflow.operators.bash_operator import BashOperator
sleep_task = BashOperator(
  task_id='sleep_for_eleven_minutes',
  bash_command='sleep 660',
  dag=dag,
)

Таким образом, ваша функциональность достигается самым простым способом без использования каких-либо сложных операторов.

...