У меня есть 30 отдельных задач в dag, они не зависят друг от друга.Задачи запускают один и тот же код.Единственное различие заключается в объеме данных, некоторые задачи будут завершены в секундах, некоторые задачи займут 2 часа или более.
Проблема во время захвата, задачи, которые заканчиваются в секундах, блокируются задачами, которые занимают часызакончите, прежде чем они перейдут к следующей дате исполнения.
Я могу разбить их на отдельные даги, но это кажется глупым, и в будущем число задач возрастет до большего.
Есть лилюбой способ запустить задачи в одном и том же пакете в разное время выполнения?Как и в случае завершения задачи, выберите следующую дату выполнения, независимо от того, как выполняются другие задачи.
Добавление рисунка для иллюстрации.По сути, я хотел бы видеть еще два сплошных зеленых прямоугольника в первом ряду, в то время как третий ряд все еще находится позади.

Редактировать:
После объяснения y2k-shubham я попытался реализовать его.Но это все еще не работает.Быстрое задание начинается с 2019-01-30 00
, заканчивается через секунду и не запускается 2019-01-30 01
, поскольку медленное задание все еще выполняется.Если возможно, было бы идеально запустить параллельно 2019-01-30 01
, 2019-01-30 02
, 2019-01-30 03
... если возможно
Пример добавления кода
import time
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2019, 1, 30, 0, 0, 0),
'trigger_rule': TriggerRule.DUMMY
}
dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')
def fast(**kwargs):
return 1
def slow(**kwargs):
time.sleep(600)
return 1
fast_task = PythonOperator(
task_id='fast',
python_callable=fast,
provide_context=True,
priority_weight=10000,
pool='fast_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
slow_task = PythonOperator(
task_id='slow',
python_callable=slow,
provide_context=True,
priority_weight=500,
pool='slow_pool',
# weight_rule='upstream', # using 1.9, this param doesn't exist
dag=dag
)
fast_task >> slow_task # not working