У меня есть группа доступности базы данных с несколькими задачами, поставленными в очередь в простой и прямой зависимости.
import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.settings import log
def task1_cb(ds, **kwargs):
log.info('Task1 Complete for date: %s' % kwargs.get('end_date'))
def task2_cb(ds, **kwargs):
log.info('Task2 Complete for date: %s' % kwargs.get('end_date'))
def task3_cb(ds, **kwargs):
log.info('Task3 Complete for date: %s' % kwargs.get('end_date'))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'concurrency': 1,
'retries': 0
}
dag = DAG(
'sample_serial_dag',
start_date=dt.datetime(2018,9,1),
end_date=dt.datetime(2018,9,5),
default_args=default_args,
schedule_interval='@daily',
catchup=True
)
task1 = PythonOperator(task_id='t1', provide_context=True, python_callable=task1_cb, dag=dag)
task2 = PythonOperator(task_id='t2', provide_context=True, python_callable=task2_cb, dag=dag)
task3 = PythonOperator(task_id='t3', provide_context=True, python_callable=task3_cb, dag=dag)
task1 >> task2 >> task3
Я хочу, чтобы она подтягивалась за прошедшие даты (выполняется @daily
).Теперь я получаю, что Задача 1 запускается 5 раз, чтобы догнать 5 сроков исполнения, а после завершения переходит к Задаче 2, которая затем запускается 5 раз, и так далее.Поток выполнения следующий:
Task1 Complete for date: 2018-09-01
Task1 Complete for date: 2018-09-02
Task1 Complete for date: 2018-09-03
Task1 Complete for date: 2018-09-04
Task1 Complete for date: 2018-09-05
Task2 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-05
Task3 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-05

Что я хочу, это следующее:
Поток выполнения следующий:
Task1 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-01
Task1 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-02
Task1 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-03
Task1 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-04
Task1 Complete for date: 2018-09-05
Task2 Complete for date: 2018-09-05
Task3 Complete for date: 2018-09-05