Воздушный поток: как убедиться, что DAG запускается через каждые 5 минут? - PullRequest
0 голосов
/ 20 сентября 2018

Я исследую Apache Airflow.Я использую метод, который вставляет запись в MySQL.

Я запланировал запуск DAG через КАЖДЫЕ 5 минут, но, похоже, этого не происходит, поскольку отметка времени MYSQL говорит о том, что задача MySQL выполняется много раз в течение 5 минут.

enter image description here

Как видите, вставка записи происходит в течение нескольких минут.Ниже мой код:

import datetime as dt

from airflow import DAG
from airflow.hooks.mysql_hook import MySqlHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def fetch_data_mysql():
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')
    sql = 'SELECT * from random_table'
    sql = "INSERT INTO random_table(text) VALUES ('Hi Adnan')"
    print('INSERT MYSQL RESULT')
    # results = mysql_hook.get_records(sql)
    # results = mysql_hook.run(sql, autocommit=True, parameters=('Hi Addu',))
    mysql_hook.run(sql, autocommit=True)

def print_world():
    print('world')
    return 'WORLD IN SEPTEMBER'


default_args = {
    'owner': 'me',
    'start_date': dt.datetime(2018, 9, 11),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=2),
}

with DAG('airflow_tutorial_v01',
         default_args=default_args,
         schedule_interval='0/5 * * * *',
         ) as dag:
    print_hello = BashOperator(task_id='print_hello',
                               bash_command='echo "hello"')
    sleep = BashOperator(task_id='sleep',
                         bash_command='sleep 5')
    print_world = PythonOperator(task_id='print_world',
                                 python_callable=print_world)
    mysql_task = PythonOperator(task_id='mysql_tut', python_callable=fetch_data_mysql)

print_hello >> sleep >> print_world >> mysql_task

Я использую v1.10.0.

Ссылка на журнал дается здесь: - https://www.dropbox.com/s/f0g64mhi8sgzlvw/my_simple_dag.py.log?dl=0

Ответы [ 2 ]

0 голосов
/ 21 сентября 2018

Твой даг засыпает.Если вы проверяете журналы, даты его выполнения 2018-09-20 00:15:00+00:00, 2018-09-20 00:20:00+00:00, 2018-09-20 00:25:00+00:00 и т. Д.

Добавьте следующее к вашему default_args:

'catchup_by_default': False

Ваш default_args должен выглядеть так:

default_args = {
    'owner': 'me',
    'start_date': dt.datetime(2018, 9, 11),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=2),
    'catchup_by_default': False,
}
0 голосов
/ 20 сентября 2018

Попробуйте изменить расписание cron с 0/5 * * * * на */5 * * * *.Последний - каждые пять минут, тогда как первый, кажется, имеет нестандартный синтаксис cron согласно crontab.guru

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...