Как получить воздушный поток, чтобы добавить тысячи задач к сельдерею одновременно? - PullRequest
0 голосов
/ 27 июня 2018

Я оцениваю Airflow 1.9.0 для наших потребностей распределенной оркестровки (используя CeleryExecutor и RabbitMQ), и я вижу что-то странное.

Я сделал даг, состоящий из трех этапов:

  1. начать
  2. одновременно разворачивает и запускает N задач
  3. отделка

N может быть большим, может быть до 10K. Я ожидаю увидеть N задач, сброшенных в очередь Rabbit, когда начнется этап 2. Вместо этого я вижу только несколько сотен , добавленных за один раз. По мере того, как рабочие обрабатывают задачи, а очередь уменьшается, в сельдерей / кролик добавляется больше. В конце концов, это действительно завершается, однако я бы предпочел, чтобы он сразу же сбрасывал ВСЕ работы (все задачи по 10 КБ) в Celery по двум причинам:

  1. Текущий способ делает планировщик долгоживущим и сохраняющим состояние. Планировщик может умереть только после завершения 5K, в этом случае оставшиеся 5K задачи никогда не будут добавлены (я проверял это)

  2. Я хочу использовать размер очереди Кролика в качестве метрики для запуска событий автомасштабирования, чтобы добавить больше работников. Поэтому мне нужна точная картина того, сколько выдающихся работ осталось (10 тысяч, а не несколько сотен)

Я предполагаю, что у планировщика есть какой-то дроссель, который удерживает его от выгрузки всех 10K-сообщений одновременно? Если это так, это настраивается?

К вашему сведению Я уже установил «параллелизм» на 10K в потоке воздуха. Cfg

Вот мой тестовый даг:

# This dag tests how well airflow fans out

from airflow import DAG
from datetime import datetime, timedelta

from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('fan_out', default_args=default_args, schedule_interval=None)

num_tasks = 10000

starting = BashOperator(
    task_id='starting',
    bash_command='echo starting',
    dag=dag
)

all_done = BashOperator(
    task_id='all_done',
    bash_command='echo all done',
    dag=dag)

for i in range(0, num_tasks):
    task = BashOperator(
        task_id='say_hello_' + str(i),
        bash_command='echo hello world',
        dag=dag)
    task.set_upstream(starting)
    task.set_downstream(all_done)

Ответы [ 2 ]

0 голосов
/ 28 июня 2018

Спасибо тем, кто предложил другие настройки параллелизма. Методом проб и ошибок я узнал, что мне нужно установить все три из них:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
 - AIRFLOW__CORE__DAG_CONCURRENCY=10000

При включении только этих двух я могу добраться до 10K, но это очень медленно, добавляя только 100 новых задач в пакетах каждые 30 секунд, ступенчато:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000

Если я включу только эти два параметра, это будет тот же шаблон «ступеньки», с добавлением 128 каждые 30 секунд:

 - AIRFLOW__CORE__PARALLELISM=10000
 - AIRFLOW__CORE__DAG_CONCURRENCY=10000

Но если я установлю все три, это добавит 10K к очереди за один выстрел.

0 голосов
/ 27 июня 2018

Есть пара других настроек, которые вы хотите увеличить.

Под [core] увеличение non_pooled_task_slot_count. Это позволит фактически поставить в очередь больше задач в сельдерее.

Под [celery] увеличение celeryd_concurrency. Это увеличит количество задач, которые каждый работник будет пытаться запустить из очереди одновременно.

Как говорится, в ответ на вашу первую причину ...

Хотя значение true, остальные задачи не будут помещены в очередь, если планировщик не запущен, но это потому, что планировщик Airflow рассчитан на длительный срок эксплуатации. Он всегда должен работать, когда работают ваши работники. Если планировщик будет убит или умрет по какой-либо причине, после того, как он начнет резервное копирование, он начнет с того места, где остановился.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...