Как запустить полный DAG несколько раз вместо того, чтобы повторять каждую задачу - PullRequest
0 голосов
/ 17 декабря 2018

У меня есть группа доступности базы данных с несколькими задачами, поставленными в очередь в простой и прямой зависимости.

import datetime as dt

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from airflow.settings import log


def task1_cb(ds, **kwargs):
    log.info('Task1 Complete for date: %s' % kwargs.get('end_date'))


def task2_cb(ds, **kwargs):
    log.info('Task2 Complete for date: %s' % kwargs.get('end_date'))


def task3_cb(ds, **kwargs):
    log.info('Task3 Complete for date: %s' % kwargs.get('end_date'))


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'concurrency': 1,
    'retries': 0
}

dag = DAG(
    'sample_serial_dag',
    start_date=dt.datetime(2018,9,1),
    end_date=dt.datetime(2018,9,5),
    default_args=default_args,
    schedule_interval='@daily',
    catchup=True
)

task1 = PythonOperator(task_id='t1', provide_context=True, python_callable=task1_cb, dag=dag)
task2 = PythonOperator(task_id='t2', provide_context=True, python_callable=task2_cb, dag=dag)
task3 = PythonOperator(task_id='t3', provide_context=True, python_callable=task3_cb, dag=dag)

task1 >> task2 >> task3

Я хочу, чтобы она подтягивалась за прошедшие даты (выполняется @daily).Теперь я получаю, что Задача 1 запускается 5 раз, чтобы догнать 5 сроков исполнения, а после завершения переходит к Задаче 2, которая затем запускается 5 раз, и так далее.Поток выполнения следующий:

Task1 Complete for date: 2018-09-01
Task1 Complete for date: 2018-09-02
Task1 Complete for date: 2018-09-03
Task1 Complete for date: 2018-09-04
Task1 Complete for date: 2018-09-05

Task2 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-05

Task3 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-05

Task 1 in graph completes for all the past dates before continuing to Task 2

Что я хочу, это следующее:

Поток выполнения следующий:

Task1 Complete for date: 2018-09-01
Task2 Complete for date: 2018-09-01
Task3 Complete for date: 2018-09-01

Task1 Complete for date: 2018-09-02
Task2 Complete for date: 2018-09-02
Task3 Complete for date: 2018-09-02

Task1 Complete for date: 2018-09-03
Task2 Complete for date: 2018-09-03
Task3 Complete for date: 2018-09-03

Task1 Complete for date: 2018-09-04
Task2 Complete for date: 2018-09-04
Task3 Complete for date: 2018-09-04

Task1 Complete for date: 2018-09-05
Task2 Complete for date: 2018-09-05
Task3 Complete for date: 2018-09-05

1 Ответ

0 голосов
/ 17 декабря 2018

Причиной такого странного поведения была установка default_args от depends_on_past до Ложь .Я скопировал его из некоторого учебного или примерного кода, фактически не замечая и не зная, что он делает.Как для документов :

depen_on_past (bool) - при значении true экземпляры задач будут выполняться последовательно, полагаясь на расписание предыдущей задачи для успешного выполнения.Экземпляр задачи для start_date разрешен к запуску.

Установка его в True сделала свое дело и решила мою проблему.

...