У меня есть проект Django с сельдереем
Из-за ограничений ОЗУ я могу запускать только два рабочих процесса.
У меня есть сочетание «медленных» и «быстрых» задач. Быстрые задачи должны быть выполнены как можно скорее. В короткий промежуток времени (от 0,1 до 3 с) может быть много быстрых задач, поэтому в идеале оба ЦП должны их обрабатывать.
Медленные задачи могут выполняться в течение нескольких минут, но результат может быть отложен.
Медленные задачи происходят реже, но может случиться так, что 2 или 3 помещаются в очередь одновременно.
Моя идея заключалась в том, чтобы иметь одну:
- 1 рабочий сельдерея W1 с параллелизмом 1, который обрабатывает только быстрые задачи
- 1 рабочий сельдерея W2 с параллелизмом 1, который может обрабатывать быстрые и медленные задачи.
сельдерей по умолчанию имеет предварительную выборку задачмножитель (https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-prefetch-multiplier) из 4, что означает, что 4 быстрых задания могут быть поставлены в очередь позади медленного задания и могут быть отложены на несколько минут. Таким образом, я хотел бы отключить предварительную выборку для рабочего W2. Документ гласит:
Чтобы отключить предварительную выборку, установите для worker_prefetch_multiplier значение 1. Изменение этого параметра на 0 позволит работнику продолжать потреблять столько сообщений, сколько ему нужно.
Однако, что я наблюдаю, так это то, что с prefetch_multiplier, равным 1, одна задача предварительно выбирается и все равно будет отложена из-за медленной задачи.
Это ошибка документации? Это ошибка реализации? Или я неправильно понимаю документацию? Есть ли способ реализовать то, что я хочу?
Команды, которые я выполняю для запуска рабочих:
celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 0
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast
мои настройки сельдерея по умолчанию, кроме:
CELERY_BROKER_URL = "pyamqp://*****@localhost:5672/mini"
CELERY_TASK_ROUTES = {
'app1.tasks.task_fast': {"queue": "fast"},
'app1.tasks.task_slow': {"queue": "slow"},
}
Файл celery.py моего проекта django:
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'miniclry.settings')
app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
__init__.py
моего проекта django
from .celery import app as celery_app
__all__ = ('celery_app',)
Код моих работников
import time, logging
from celery import shared_task
from miniclry.celery import app as celery_app
logger = logging.getLogger(__name__)
@shared_task
def task_fast(delay=0.1):
logger.warning("fast in")
time.sleep(delay)
logger.warning("fast out")
@shared_task
def task_slow(delay=30):
logger.warning("slow in")
time.sleep(delay)
logger.warning("slow out")
Если я выполняю следующее из командной консоли, я вижу, что одна быстрая задача выполняется только после того, как медленная задача закончена.
from app1.tasks import task_fast, task_slow
task_slow.delay()
for i in range(30):
task_fast.delay()
Кто-нибудь может помочь?
Я мог бы опубликоватьвесь тестовый проект, если это считается полезным. Просто посоветуйте рекомендуемый SO способ обмена такого рода проектами
Информация о версии:
- сельдерей == 4.3.0
- Django == 1.11.25
- Python 2.7.12