Воздушный поток запускает задачи в разное время в одном и том же даге? - PullRequest
0 голосов
/ 07 февраля 2019

У меня есть 30 отдельных задач в dag, они не зависят друг от друга.Задачи запускают один и тот же код.Единственное различие заключается в объеме данных, некоторые задачи будут завершены в секундах, некоторые задачи займут 2 часа или более.

Проблема во время захвата, задачи, которые заканчиваются в секундах, блокируются задачами, которые занимают часызакончите, прежде чем они перейдут к следующей дате исполнения.

Я могу разбить их на отдельные даги, но это кажется глупым, и в будущем число задач возрастет до большего.

Есть лилюбой способ запустить задачи в одном и том же пакете в разное время выполнения?Как и в случае завершения задачи, выберите следующую дату выполнения, независимо от того, как выполняются другие задачи.

Добавление рисунка для иллюстрации.По сути, я хотел бы видеть еще два сплошных зеленых прямоугольника в первом ряду, в то время как третий ряд все еще находится позади.

airflow_dag_ideal

Редактировать:

После объяснения y2k-shubham я попытался реализовать его.Но это все еще не работает.Быстрое задание начинается с 2019-01-30 00, заканчивается через секунду и не запускается 2019-01-30 01, поскольку медленное задание все еще выполняется.Если возможно, было бы идеально запустить параллельно 2019-01-30 01, 2019-01-30 02, 2019-01-30 03 ... если возможно

Пример добавления кода

import time
from datetime import datetime

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 30, 0, 0, 0),
    'trigger_rule': TriggerRule.DUMMY
}

dag = DAG(dag_id='test_dag', default_args=default_args, schedule_interval='@hourly')


def fast(**kwargs):
    return 1


def slow(**kwargs):
    time.sleep(600)
    return 1


fast_task = PythonOperator(
    task_id='fast',
    python_callable=fast,
    provide_context=True,
    priority_weight=10000,
    pool='fast_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

slow_task = PythonOperator(
    task_id='slow',
    python_callable=slow,
    provide_context=True,
    priority_weight=500,
    pool='slow_pool',
    # weight_rule='upstream', # using 1.9, this param doesn't exist
    dag=dag
)

fast_task >> slow_task # not working

Ответы [ 2 ]

0 голосов
/ 08 февраля 2019

Я могу придумать 3 возможных решения ваших проблем (добавлю больше альтернатив, когда они придут в голову)

  1. Установить start_date на отдельных task с в пределах DAG (кроме start_date из DAG самого себя), как сказано здесь .Однако я бы никогда не одобрил бы этот подход , потому что это было бы как шаг назад к тем же основанным на времени кронам , которые Airflow пытается заменить.

  2. Используйте pool с для разделения task с на время выполнения / приоритет .Вот идея (вам может потребоваться переработать в соответствии с вашими требованиями): Поместите все крошечные task с в tiny_task_pool и все большие в big_task_pool.Пусть tiny_task_pool имеет значительно большее число slot с, чем big_task_pool.Это сделает голодание ваших крошечных задач гораздо менее вероятным.Вы можете проявить творческий подход с еще большим количеством уровней из pool с.

  3. Даже если ваши task s не имеют реальных зависимостей междуим, это не должно сильно повредить, чтобы сознательно ввести некоторые зависимости так, чтобы все (или большинство) большие задачи были сделаны downstream из крошечных (иследовательно измените структуру вашего DAG).Это озвучило бы подход самая короткая работа в первую очередь .Вы также можете исследовать priority_weight / priority_rule, чтобы получить еще более детальный контроль.

Все вышеупомянутые альтернативы предполагают, что длина task s '(продолжительность исполнения) известны заранее.В реальном мире это может быть не так;или даже если это так, оно может постепенно меняться со временем.Для этого я бы предложил вам настроить ваш сценарий определения dag , чтобы учесть среднее (или медианное) время выполнения ваших task s за последние 'n' запускичтобы определить их приоритет.

  • Для метода start_date просто укажите более поздний start_date (фактически тот же день, более позднее время) для task с, который работал дольше в предыдущих запусках
  • Для метода pool s перемещайте task s вокруг различных pool s в зависимости от их предыдущей продолжительности работы
  • Для метода задачи-зависимости увеличьте время работы taskс downstream.Это может показаться сложным, но вы можете визуализировать это так: Создайте 3 DummyOperator s и свяжите их (один за другим).Теперь вам нужно заполнить все маленькие задачи между первыми 2 DummyOperator с и большими между следующими двумя.
0 голосов
/ 07 февраля 2019

Вероятно, это связано с тем, что у вас меньше временных интервалов выполнения, чем медленных.Планировщик не особенно заботится о том, в каком порядке он выполняет задачи, потому что вы сказали, что вас это тоже не волнует.

Если это действительно важно для вас, их, вероятно, следует разбить на различные пакеты илиВы должны объявить зависимости, которые вы хотите, чтобы сначала выполнялись более дешевые задачи.Есть множество способов выразить то, что вы хотите, вы просто должны выяснить, что это такое.

...