Как указать ядра процессора для работников сельдерея? - PullRequest
2 голосов
/ 11 января 2020

Я использую Воздушный поток и сельдерей с ним. Я хочу указать список ядер ЦП, чтобы работники сельдерея и сельдерея могли распределять процессы только по этим конкретным ядрам.

Я ознакомился с документацией по сельдерею ( здесь ) с дополнительным акцентом на разделах, а именно: параллелизм, оптимизация и руководство для рабочих. Однако не смог найти решение.

Итак, как я могу гарантировать, что celery назначает задачи только конкретным ядрам процессора. Я не могу использовать taskset. Если можно связать каждый ForkPoolWorker и обработать его для определенного ядра процессора, тогда сельдерей станет для нас идеальным решением. Однако, если сельдерей управляет группой сердечников, нам это тоже удобно. Но нам нужно выделить ядра процессора для сельдерея.

Ответы [ 3 ]

1 голос
/ 23 января 2020

На данный момент Airflow (на самом деле Celery) не предоставляет простого способа сделать это. Таким образом, мы решили проблему, используя несколько хаков с предложением @ Иэн Шелвингтон .

Мы поместили новую строку в airflow.cfg , которая дает нам номера процессорных ядер, на которые сельдерей должен запускать процессы, которые он выбирает.

Затем мы извлекаем id всех сельдереев ForkPoolWorker (s) и, используя taskset, устанавливаем сродство к поставляемым ядрам.

Мы написали bash скрипт start_worker. sh для начала работника. Кроме того, мы ограничили число работников до 1, но увеличили worker_concurrency

$ AIRFLOW_HOME / airflow.cfg так:

...
worker_concurrency = 36
# We added the following line
cores_available = 12,13,14,15...45,46,47
...

Псевдокод для start_worker. sh выглядит так:

# start a worker in daemon mode
airflow worker -D
# get the list of cpu cores
CORES=$(grep "cores_available" | cut -d '=' -f 2)
# get the PID of celery ForkWorkers
PIDS=$(ps -eaf | grep celery.*ForkPoolWorker*)
# for every PID, use the core
for idx in len(workers_count); do
    taskset -c CORES[idx] PIDS[idx]  
0 голосов
/ 23 января 2020

Это почти всегда плохая идея, если вы не находитесь на причудливой архитектуре, такой как big.LITTLE, и вы хотите убедиться, что, например, системный планировщик не выполняет вычислительно дорогостоящую задачу (поток) на неправильном ядре. Сельдерей не может этого сделать, но вы можете сделать это вручную на Linux, используя taskset. Если вы установите сходство с рабочим Celery, то все рабочие процессы будут работать на этом конкретном ядре, что является ужасной идеей, если только ваш параллелизм не установлен на 1 (только один рабочий процесс).

Если вы все еще думаете это хорошая идея, и она решает вашу проблему, тогда я бы предложил вам попробовать написать обработчик worker_process_init , который в основном вызывает os.sched_setaffinity во вновь созданном рабочем процессе. Имейте в виду, что этот обработчик должен быть быстрым.

0 голосов
/ 23 января 2020

Вы пробовали установить worker_concurrency = # of cores?

http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker -конкурентность

...