Apache Airflow: запускать все параллельные задачи за один прогон DAG - PullRequest
0 голосов
/ 21 марта 2019

У меня есть группа обеспечения доступности баз данных, в которой имеется 30 (или более) динамически создаваемых параллельных задач.

У меня установлена ​​опция concurrency для этой группы обеспечения доступности баз данных, чтобы при выполнении истории только один прогон DAG выполнялся,Когда я запускаю его на своем сервере, на самом деле параллельно выполняется только 16 задач, а остальные 14 просто ждут в очереди.

Какой параметр мне следует изменить, чтобы у меня был запущен только 1 прогон DAG, но свсе 30+ задач, выполняющихся параллельно?

Согласно этому FAQ , похоже, что это одна из dag_concurrency или max_active_runs_per_dag, но первая кажется перегруженнойуже установив concurrency, в то время как последний, казалось, не имел никакого эффекта (или я фактически испортил свою настройку).Вот пример кода:

import datetime as dt
import logging

from airflow.operators.dummy_operator import DummyOperator

import config

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,
    'retries': 0,
}


def print_operators(ds, **kwargs):
    logging.info(f"Task {kwargs.get('task_instance_key_str', 'unknown_task_instance')}")


dag = DAG(
    dag_id='test_parallelism_dag',
    start_date=dt.datetime(2019, 1, 1),
    default_args=default_args,
    schedule_interval='@daily',
    catchup=True,
    template_searchpath=[config.DAGS_PATH],
    params={'schema': config.SCHEMA_DB},
    max_active_runs=1,
)

print_operators = [PythonOperator(
    task_id=f'test_parallelism_dag.print_operator_{i}',
    python_callable=print_operators,
    provide_context=True,
    dag=dag
) for i in range(60)]

dummy_operator_start = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_start',
)

dummy_operator_end = DummyOperator(
    task_id=f'test_parallelism_dag.dummy_operator_end',
)

dummy_operator_start >> print_operators >> dummy_operator_end

РЕДАКТИРОВАТЬ 1 : Мой текущий airflow.cfg содержит:

executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26

Мои переменные env следующие (установите их всеотличается, чтобы легко определить, кто помогает):

AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22

И с этим у меня есть следующая диаграмма Ганта: enter image description here

Какой вид дает мне подсказку, чтоустановка переменной env DAG_CONCURRENCY работает.

Ответы [ 2 ]

1 голос
/ 21 марта 2019

Обновите также конфигурацию concurrency в файле airflow.cfg.Если это 16, увеличьте его до 32.

Если вы используете Celery Executor, измените worker_concurrency на 32.

0 голосов
/ 21 марта 2019

Фактический параметр, который нужно изменить, был dag_concurrency в airflow.cfg или переопределите его с помощью AIRFLOW__CORE__DAG_CONCURRENCY переменной env.

Как для документов, на которые я ссылался в своем вопросе :

concurrency: Планировщик воздушного потока будет запускать не более $concurrency экземпляров задач для вашей группы доступности базы данных в любой момент времени.Параллельность определена в вашей группе Airflow DAG.Если вы не установите параллелизм в своей группе обеспечения доступности баз данных, планировщик будет использовать значение по умолчанию из записи dag_concurrency в вашем файле airflow.cfg.

Что означает следующий упрощенный код:

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
    'concurrency': 1,
}


dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
)

следует переписать так:

default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream': True,
}


dag = DAG(
    dag_id='test_parallelism_dag',
    default_args=default_args,
    max_active_runs=1,
    concurrency=30
)

Мой код на самом деле ошибочно полагает, что default_args в какой-то момент заменяет фактические kwargs на конструктор DAG.Я не знаю, что привело меня к такому выводу тогда, но я предполагаю, что при установке concurrency в 1 есть какой-то черновой остаток, который на самом деле ни на что не влиял, и фактический параллелизм DAG был установлен из config default, который равен 16.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...