Как я могу контролировать параллельность или параллелизм DAG Airflow? - PullRequest
1 голос
/ 30 мая 2019

В некоторых моих установках Airflow запланированные для выполнения группы DAG или задачи не запускаются, даже если планировщик загружен не полностью.Как я могу увеличить количество групп обеспечения доступности баз данных или задач, которые могут выполняться одновременно?

Аналогичным образом, если моя установка находится под высокой нагрузкой и я хочу ограничить, насколько быстро мои работники Airflow выполняют задачи из очереди, что я могу настроить?

Ответы [ 2 ]

2 голосов
/ 30 мая 2019

Вот расширенный список параметров конфигурации, доступных в Airflow v1.10.2. Некоторые из них могут быть установлены для каждой группы DAG или для каждого оператора, и могут не соответствовать значениям по умолчанию для всей установки, если они не указаны.


Параметры, которые можно указать для каждой группы DAG :

  • concurrency: количество экземпляров задач, которым разрешено одновременное выполнение для всех активных запусков группы обеспечения доступности баз данных, для которой установлено. По умолчанию core.dag_concurrency, если не установлено
  • max_active_runs: максимальное количество активных прогонов для этой группы DAG. Планировщик не будет создавать новые активные прогоны DAG после достижения этого предела. По умолчанию core.max_active_runs_per_dag, если не установлено

Примеры:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)

Опции, которые можно указать для каждого оператора :

  • pool: пул для выполнения задачи. Пулы могут использоваться для ограничения параллелизма для только подмножества задач
  • task_concurrency: лимит для параллелизма на уровне задач
* +1037 * Пример:
t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)

Опции, которые указаны для всей настройки воздушного потока :

  • core.parallelism: максимальное количество задач, выполняемых во всей установке Airflow
  • core.dag_concurrency: максимальное количество задач, которые могут быть запущены на группу обеспечения доступности баз данных (для нескольких прогонов DAG )
  • core.non_pooled_task_slot_count: количество слотов задач, выделенных для задач, не запущенных в пуле
  • core.max_active_runs_per_dag: максимальное количество активных DAG прогонов , на DAG
  • scheduler.max_threads: сколько потоков должен использовать процесс планировщика для планирования DAG
  • celery.worker_concurrency: количество экземпляров задач, которые работник будет принимать при использовании CeleryExecutor
  • celery.sync_parallelism: число процессов, которые CeleryExecutor должен использовать для синхронизации состояния задачи
0 голосов
/ 17 июня 2019

Проверьте конфигурацию воздушного потока, для которого используется core.executor .SequentialExecutor будет выполняться последовательно, поэтому вы можете выбрать Local Executor или Clery Executor, которые выполняют задачу параллельно.После этого вы можете использовать другие параметры, указанные @ hexacyanide

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...