Airflow 1.10.3 SubDag может выполнять только 1 задачу параллельно, даже если параллелизм равен 8 - PullRequest
1 голос
/ 09 мая 2019

Недавно я обновил Airflow с 1.9 до 1.10.3 (последний).

Однако я заметил проблему с производительностью, связанную с параллелизмом SubDag. Можно выбрать только 1 задачу внутри SubDag, что не так, как должно быть, наша настройка параллелизма для SubDag - 8.

см. Следующее: get_monthly_summary-214 и get_monthly_summary-215 - это два SubDag, они могут быть запущены в параллельном контроллере с помощью параллелизма родительского dag

enter image description here

Но при увеличении SubDag произнесите get_monthly_summary-214, тогда enter image description here Вы определенно можете видеть, что одновременно выполняется только одна задача, остальные поставлены в очередь, и она продолжает работать таким образом. Когда мы проверяем параллелизм SubDag, на самом деле это 8, как мы указали в коде: enter image description here

Мы настроили размер слотов пула, он равен 32, у нас есть 8 работников сельдерея, чтобы поднять задачу в очереди, и наша конфигурация воздушного потока, связанная с параллелизмом, выглядит следующим образом:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16

Также все SubDag конфигурируются с использованием очереди с именем mini, тогда как все его внутренние задачи являются очередью по умолчанию с именем default, поскольку у нас могут возникнуть некоторые проблемы с взаимоблокировками ранее, если мы запустим оба оператора SubDag и внутренние задачи SubDag в той же очереди. Я также пытался использовать очередь default для всех задач и операторов, это не помогает.

Старая версия 1.9 кажется хорошей, что каждый SubDag может выполнять несколько задач параллельно, мы что-то упустили?

Ответы [ 2 ]

3 голосов
/ 09 мая 2019

Это связано с тем, что в Airflow 1.9.0 SubdagOperator использовался исполнитель по умолчанию.

Воздушный поток 1.9.0 : https://github.com/apache/airflow/blob/1.9.0/airflow/operators/subdag_operator.py#L33

class SubDagOperator(BaseOperator):

    template_fields = tuple()
    ui_color = '#555'
    ui_fgcolor = '#fff'

    @provide_session
    @apply_defaults
    def __init__(
            self,
            subdag,
            executor=GetDefaultExecutor(),
            *args, **kwargs):

Однаконачиная с Airflow 1.10 и далее, исполнитель по умолчанию для SubDagOperator изменяется на SequentialExecutor

Airflow> = 1.10 : https://github.com/apache/airflow/blob/1.10.0/airflow/operators/subdag_operator.py#L38

class SubDagOperator(BaseOperator):

    template_fields = tuple()
    ui_color = '#555'
    ui_fgcolor = '#fff'

    @provide_session
    @apply_defaults
    def __init__(
            self,
            subdag,
            executor=SequentialExecutor(),
            *args, **kwargs):

Фиксация, которая его изменилаis https://github.com/apache/airflow/commit/64d950166773749c0e4aa0d7032b080cadd56a53#diff-45749879e4753a355c5bdb5203584698

И подробную причину, по которой это было изменено, можно найти в https://github.com/apache/airflow/pull/3251

Мы должны изменить исполнителя по умолчанию для subdag_operator на SequentialExecutor.Поддагоператор не учитывает пул воздушных потоков, поэтому он может потреблять все рабочие ресурсы (например, в celeryExecutor).Это вызывает проблемы, упомянутые в airflow-74, и ограничивает использование subdag_operator.Мы используем subdag_operator в производстве, указав с помощью последовательного исполнителя.

1 голос
/ 10 мая 2019

Основываясь на результатах поиска @kaxil, опубликованном выше, решение для обхода проблемы, если вы все еще хотите выполнять задачи внутри подпадала параллельно, создает функцию-обертку для явной передачи executor при конструкции SubDagOperator:

from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor

def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
    return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)

позвоните sub_dag_operator_with_default_executor, когда вы создали свой оператор subdag. Для того, чтобы избавить оператора sub dag от проблем производительности

Мы должны изменить исполнителя по умолчанию для subdag_operator на SequentialExecutor. Поддагоператор не учитывает пул воздушных потоков, поэтому он может потреблять все рабочие ресурсы (например, в celeryExecutor). Это вызывает проблемы, упомянутые в airflow-74, и ограничивает использование subdag_operator. Мы используем subdag_operator в производстве, указав с помощью последовательного исполнителя.

Мы предлагаем создать специальную очередь (в нашем случае мы указываем queue = 'mini') и работника сельдерея для обработки оператора subdag_operator, чтобы он не занимал все ресурсы вашего обычного работника сельдерея. Следующим образом:

 dag = DAG(
    dag_id=DAG_NAME,
    description=f"{DAG_NAME}-{__version__}",
    ...
)    
with dag:
        ur_operator = sub_dag_operator_with_default_executor(
                task_id=f"your_task_id",
                subdag=load_sub_dag(
                    parent_dag_name=DAG_NAME,
                    child_dag_name=f"your_child_dag_name",
                    args=args,
                    concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
                ),
                queue="mini",
                dag=dag
            )

Затем, когда вы создаете своего специального работника сельдерея (мы используем облегченный хост, такой как 2 ядра и память 3G), укажите AIRFLOW__CELERY__DEFAULT_QUEUE как mini, в зависимости от того, сколько операторов sub dag вы хотели бы запустить параллельно вам нужно создать несколько специальных рабочих из сельдерея, чтобы распределить нагрузку на ресурсы. Мы предлагаем, чтобы каждый специальный сотрудник из сельдерея заботился не более чем о 2 подпрограммах за раз, или он будет исчерпан (например, нехватка памяти на 2 ядрах). и хост памяти 3G)

Также вы можете отрегулировать concurrency внутри своего подпада с помощью ENV VAR concurrency_in_sub_dag, созданного на странице конфигурации UI Variables воздушного потока.

...