DAG воздушного потока работает каждую секунду, а не каждую минуту - PullRequest
0 голосов
/ 05 июня 2018

Я пытаюсь запланировать запуск своей группы доступности базы данных каждую минуту, но вместо этого она запускается каждую секунду.Исходя из всего, что я прочитал, мне просто нужно включить schedule_interval='*/1 * * * *', #..every 1 minute в мою группу обеспечения доступности баз данных, и это все, но это не работает.Вот простой пример, который я настроил, чтобы проверить его:

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 4),
    'schedule_interval': '*/1 * * * *', #..every 1 minute
    'email': ['airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    max_active_runs=3,
    schedule_interval='*/1 * * * *', #..every 1 minute
    default_args=default_args,
)

test= BashOperator(
    task_id='test',
    bash_command="echo hey >> /home/ec2-user/schedule_test.txt",
    retries=1,
    dag=dag)

Обновление:

Поговорив с @Taylor Edmiston относительно его решения, мы поняли, что причинаМне нужно было добавить catchup=False, потому что я установил Airflow с помощью Pip, который использует устаревшую версию Airflow.Очевидно, что если вы используете Airflow из основной ветки своего репозитория , вам не нужно будет включать catchup=False, чтобы он запускался каждую минуту, как я пытался.Таким образом, хотя принятый ответ решил мою проблему, это своего рода не решение основной проблемы, обнаруженной @Taylor Edmiston.

Ответы [ 2 ]

0 голосов
/ 05 июня 2018

Ваш schedule_interval на DAG является правильным: */1 * * * * равен каждую минуту .

Вы также можете удалить start_date и schedule_interval из default_args, так как они 'избыточно с kwargs, предоставленными DAG.

Если вы изменили расписание с момента создания этого DAG, возможно, Airflow запутался.Попробуйте удалить DAG в базе данных, а затем перезапустить планировщик и веб-сервер.Если вы находитесь в основной ветке Airflow, это так же просто, как $ airflow delete_dag my_dag;в противном случае связанный ответ объясняет, как это сделать в других версиях.

Я свел ваш код к этому, чтобы проверить, и он определенно выполняет один прогон DAG в минуту при запуске внутри главной ветви Airflow.

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    schedule_interval='*/1 * * * *',
    default_args=default_args,
)

test = BashOperator(
    task_id='test',
    bash_command='echo "one minute test"',
    dag=dag,
)

DAG работает:

enter image description here

0 голосов
/ 05 июня 2018

Попробуйте добавить catchup=False в DAG().Возможно, ваш DAG пытается выполнить обратную засыпку из-за объявленного вами start_date.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...