На данный момент Airflow (на самом деле Celery) не предоставляет простого способа сделать это. Таким образом, мы решили проблему, используя несколько хаков с предложением @ Иэн Шелвингтон .
Мы поместили новую строку в airflow.cfg , которая дает нам номера процессорных ядер, на которые сельдерей должен запускать процессы, которые он выбирает.
Затем мы извлекаем id всех сельдереев ForkPoolWorker
(s) и, используя taskset
, устанавливаем сродство к поставляемым ядрам.
Мы написали bash скрипт start_worker. sh для начала работника. Кроме того, мы ограничили число работников до 1, но увеличили worker_concurrency
$ AIRFLOW_HOME / airflow.cfg так:
...
worker_concurrency = 36
# We added the following line
cores_available = 12,13,14,15...45,46,47
...
Псевдокод для start_worker. sh выглядит так:
# start a worker in daemon mode
airflow worker -D
# get the list of cpu cores
CORES=$(grep "cores_available" | cut -d '=' -f 2)
# get the PID of celery ForkWorkers
PIDS=$(ps -eaf | grep celery.*ForkPoolWorker*)
# for every PID, use the core
for idx in len(workers_count); do
taskset -c CORES[idx] PIDS[idx]