DAG Airflow запускается несколько раз за одну минуту, хотя он должен был запускаться каждые 5 минут - PullRequest
1 голос
/ 14 июня 2019

Я создал группу обеспечения доступности баз данных, выполнение которой было запланировано на каждые 5 минут с использованием синтаксиса cron. Кроме того, для этого дамба был создан пул с одним слотом.

Я попытался перезапустить сервер / планировщик и сбросить базу данных. В настоящее время DAG работает по времени UTC. Кроме того, я попытался установить местный часовой пояс, который называется «Европа / Минск» (UTC + 3), и он не дает никакого эффекта.

import random
import time
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'pool': 'download',
    # 'priority_weight': 10,
    # 'queue': 'bash_queue',
}

params = {
    'table': 'api_avitoimage',
}

dag = DAG(
    dag_id='test_download_avitoimage',
    default_args=default_args,
    schedule_interval='*/5 * * * *',
)


def sleep_for_a_bit(random_base):
    time.sleep(random_base)

with dag:

    download = BashOperator(
        task_id='download',
        bash_command='/usr/bin/python3 /home/artur/downloader.py --table {{ params.table }}',
        params=params,
        dag=dag)

    sleep = PythonOperator(
        task_id='sleep_for_a_bit',
        python_callable=sleep_for_a_bit,
        op_kwargs={'random_base': random.uniform(0, 1)},
        dag=dag,
    )

    download >> sleep

Проблема: DAG работает ~ 2-3 раза в минуту, что является совершенно неправильным исполнением. РЕДАКТИРОВАНИЕ: Бывает, что одновременно работает 16/16 активных DAG. Но я не могу понять, откуда появилось это "магическое число 16".

1 Ответ

2 голосов
/ 14 июня 2019

По умолчанию Airflow пытается завершить все «пропущенные» группы DAG, начиная с start_date. Поскольку для start_date установлено значение airflow.utils.dates.days_ago(2), Airflow будет запускать DAG 576 раз, прежде чем начнет запуск DAG по расписанию. Вы можете отключить его, добавив catchup = False в определение DAG (не default_args).

Магическое число 16 происходит из параметра max_active_runs_per_dag = 16, который установлен по умолчанию.

...