Воздушный поток запускается два прогона DAG при первом включении - PullRequest
0 голосов
/ 25 октября 2019

Когда я в первый раз загружаю веб-сервер и планировщик Airflow 25 октября около 17:23 и включаю DAG, я вижу, что он запускает два запуска: 23 октября и 24 октября:

RUN 1 -> 10-23T17:23
RUN 2 -> 10-24T17:23

Вот моя конфигурация DAG:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': '2019-01-01',
    'retries': 0,
}
dag = DAG(
    'my_script',
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
    catchup=False,
)

Поскольку он прошел start_date + schedule_interval и я установил catchup=False, я ожидал бы, что он сразу же запустит один прогон DAG, однако яне ожидал, что он запустит два.

  • Почему выполняются два прогона DAG?
  • Как я могу предотвратить это поведение?

1 Ответ

1 голос
/ 26 октября 2019

Я не уверен, но это мое лучшее предположение -

Вкратце, возможно, именно так построен воздушный поток, и обходной путь может изменить ваш start_date на вчерашний.

TL; DR

Я согласен с тем, что при включении 10 даг на 10-24 звучат более естественно.

Однако, согласно вашим пробежкам, RUN 1 - 10-23. Это говорит мне о том, что инициализация первого запуска не правильна, и я изучил код планировщика.

И у меня есть сомнения в этой строке.

https://github.com/apache/airflow/blob/68b8ec5f415795e4fa4ff7df35a3e75c712a7bad/airflow/jobs/scheduler_job.py#L603

Это внутри функции, которая создает прогон Dag и устанавливает дату начала прогона.

# The logic is that we move start_date up until
# one period before, so that timezone.utcnow() is AFTER
# the period end, and the job can be created...
now = timezone.utcnow()

# This returns current time + schedule_interval. In your example, this will be tomorrow.
next_start = dag.following_schedule(now)

# This returns current time - schedule_interval. In your example, this will be yesterday.
last_start = dag.previous_schedule(now)

# tomorrow <= today should return False 
if next_start <= now:
    new_start = last_start
else:
    # and this will return last_start - schedule_interval which means 2 days ago.  
    # wondering if this is intended to be dag.previous_schedule(next_start)???
    new_start = dag.previous_schedule(last_start) 
...