У меня есть группа обеспечения доступности баз данных, в которой имеется 30 (или более) динамически создаваемых параллельных задач.
У меня установлена опция concurrency
для этой группы обеспечения доступности баз данных, чтобы при выполнении истории только один прогон DAG выполнялся,Когда я запускаю его на своем сервере, на самом деле параллельно выполняется только 16 задач, а остальные 14 просто ждут в очереди.
Какой параметр мне следует изменить, чтобы у меня был запущен только 1 прогон DAG, но свсе 30+ задач, выполняющихся параллельно?
Согласно этому FAQ , похоже, что это одна из dag_concurrency
или max_active_runs_per_dag
, но первая кажется перегруженнойуже установив concurrency
, в то время как последний, казалось, не имел никакого эффекта (или я фактически испортил свою настройку).Вот пример кода:
import datetime as dt
import logging
from airflow.operators.dummy_operator import DummyOperator
import config
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'wait_for_downstream': True,
'concurrency': 1,
'retries': 0,
}
def print_operators(ds, **kwargs):
logging.info(f"Task {kwargs.get('task_instance_key_str', 'unknown_task_instance')}")
dag = DAG(
dag_id='test_parallelism_dag',
start_date=dt.datetime(2019, 1, 1),
default_args=default_args,
schedule_interval='@daily',
catchup=True,
template_searchpath=[config.DAGS_PATH],
params={'schema': config.SCHEMA_DB},
max_active_runs=1,
)
print_operators = [PythonOperator(
task_id=f'test_parallelism_dag.print_operator_{i}',
python_callable=print_operators,
provide_context=True,
dag=dag
) for i in range(60)]
dummy_operator_start = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_start',
)
dummy_operator_end = DummyOperator(
task_id=f'test_parallelism_dag.dummy_operator_end',
)
dummy_operator_start >> print_operators >> dummy_operator_end
РЕДАКТИРОВАТЬ 1 : Мой текущий airflow.cfg
содержит:
executor = SequentialExecutor
parallelism = 32
dag_concurrency = 24
max_active_runs_per_dag = 26
Мои переменные env следующие (установите их всеотличается, чтобы легко определить, кто помогает):
AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__DAG_CONCURRENCY=18
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=20
AIRFLOW__CORE__WORKER_CONCURRENCY=22
И с этим у меня есть следующая диаграмма Ганта:
Какой вид дает мне подсказку, чтоустановка переменной env DAG_CONCURRENCY работает.