Основываясь на результатах поиска @kaxil, опубликованном выше, решение для обхода проблемы, если вы все еще хотите выполнять задачи внутри подпадала параллельно, создает функцию-обертку для явной передачи executor
при конструкции SubDagOperator
:
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor
def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)
позвоните sub_dag_operator_with_default_executor
, когда вы создали свой оператор subdag. Для того, чтобы избавить оператора sub dag от проблем производительности
Мы должны изменить исполнителя по умолчанию для subdag_operator на SequentialExecutor. Поддагоператор не учитывает пул воздушных потоков, поэтому он может потреблять все рабочие ресурсы (например, в celeryExecutor). Это вызывает проблемы, упомянутые в airflow-74, и ограничивает использование subdag_operator. Мы используем subdag_operator в производстве, указав с помощью последовательного исполнителя.
Мы предлагаем создать специальную очередь (в нашем случае мы указываем queue = 'mini') и работника сельдерея для обработки оператора subdag_operator, чтобы он не занимал все ресурсы вашего обычного работника сельдерея. Следующим образом:
dag = DAG(
dag_id=DAG_NAME,
description=f"{DAG_NAME}-{__version__}",
...
)
with dag:
ur_operator = sub_dag_operator_with_default_executor(
task_id=f"your_task_id",
subdag=load_sub_dag(
parent_dag_name=DAG_NAME,
child_dag_name=f"your_child_dag_name",
args=args,
concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
),
queue="mini",
dag=dag
)
Затем, когда вы создаете своего специального работника сельдерея (мы используем облегченный хост, такой как 2 ядра и память 3G), укажите AIRFLOW__CELERY__DEFAULT_QUEUE
как mini
, в зависимости от того, сколько операторов sub dag вы хотели бы запустить параллельно вам нужно создать несколько специальных рабочих из сельдерея, чтобы распределить нагрузку на ресурсы. Мы предлагаем, чтобы каждый специальный сотрудник из сельдерея заботился не более чем о 2 подпрограммах за раз, или он будет исчерпан (например, нехватка памяти на 2 ядрах). и хост памяти 3G)
Также вы можете отрегулировать concurrency
внутри своего подпада с помощью ENV VAR concurrency_in_sub_dag
, созданного на странице конфигурации UI Variables
воздушного потока.