import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from workflow.task import some_task
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['jimin.park1@aig.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
'start_date': airflow.utils.dates.days_ago(0)
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('JiminTest', default_args=default_args, schedule_interval='*/1 * * * *', catchup=False)
t1 = PythonOperator(
task_id='Task1',
provide_context=True,
python_callable=some_task,
dag=dag
)
Сам фактический some_task просто добавляет метку времени к некоторому файлу. Как видно из конфигурационного файла dag, сама задача настроена на выполнение каждые 1 мин.
def some_task(ds, **kwargs):
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("test.txt", "a") as myfile:
myfile.write(current_time + '\n')
Я просто включил выходной файл и запустил веб-сервер без запуска планировщика. Эта функция вызывалась, и все добавлялось в файл при запуске веб-сервера. Когда я запускаю планировщик, в каждом цикле выполнения файл добавляется.
Я хочу, чтобы функция выполнялась каждую минуту, а не каждый цикл выполнения.