django + celery: отключить предварительную выборку для одного рабочего, есть ли ошибка? - PullRequest
1 голос
/ 08 октября 2019

У меня есть проект Django с сельдереем

Из-за ограничений ОЗУ я могу запускать только два рабочих процесса.

У меня есть сочетание «медленных» и «быстрых» задач. Быстрые задачи должны быть выполнены как можно скорее. В короткий промежуток времени (от 0,1 до 3 с) может быть много быстрых задач, поэтому в идеале оба ЦП должны их обрабатывать.

Медленные задачи могут выполняться в течение нескольких минут, но результат может быть отложен.

Медленные задачи происходят реже, но может случиться так, что 2 или 3 помещаются в очередь одновременно.

Моя идея заключалась в том, чтобы иметь одну:

  • 1 рабочий сельдерея W1 с параллелизмом 1, который обрабатывает только быстрые задачи
  • 1 рабочий сельдерея W2 с параллелизмом 1, который может обрабатывать быстрые и медленные задачи.

сельдерей по умолчанию имеет предварительную выборку задачмножитель (https://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-prefetch-multiplier) из 4, что означает, что 4 быстрых задания могут быть поставлены в очередь позади медленного задания и могут быть отложены на несколько минут. Таким образом, я хотел бы отключить предварительную выборку для рабочего W2. Документ гласит:

Чтобы отключить предварительную выборку, установите для worker_prefetch_multiplier значение 1. Изменение этого параметра на 0 позволит работнику продолжать потреблять столько сообщений, сколько ему нужно.

Однако, что я наблюдаю, так это то, что с prefetch_multiplier, равным 1, одна задача предварительно выбирается и все равно будет отложена из-за медленной задачи.

Это ошибка документации? Это ошибка реализации? Или я неправильно понимаю документацию? Есть ли способ реализовать то, что я хочу?

Команды, которые я выполняю для запуска рабочих:

celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 0
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast

мои настройки сельдерея по умолчанию, кроме:

CELERY_BROKER_URL = "pyamqp://*****@localhost:5672/mini"
CELERY_TASK_ROUTES = {
    'app1.tasks.task_fast':  {"queue": "fast"},
    'app1.tasks.task_slow':  {"queue": "slow"},
    }

Файл celery.py моего проекта django:

from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'miniclry.settings')
app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

__init__.py моего проекта django

from .celery import app as celery_app
__all__ = ('celery_app',)

Код моих работников

import time, logging
from celery import shared_task
from miniclry.celery import app as celery_app
logger = logging.getLogger(__name__)

@shared_task
def task_fast(delay=0.1):
    logger.warning("fast in")
    time.sleep(delay)
    logger.warning("fast out")

@shared_task
def task_slow(delay=30):
    logger.warning("slow in")
    time.sleep(delay)
    logger.warning("slow out")

Если я выполняю следующее из командной консоли, я вижу, что одна быстрая задача выполняется только после того, как медленная задача закончена.

from app1.tasks import task_fast, task_slow

task_slow.delay()
for i in range(30):
    task_fast.delay()

Кто-нибудь может помочь?

Я мог бы опубликоватьвесь тестовый проект, если это считается полезным. Просто посоветуйте рекомендуемый SO способ обмена такого рода проектами

Информация о версии:

  • сельдерей == 4.3.0
  • Django == 1.11.25
  • Python 2.7.12

1 Ответ

1 голос
/ 10 октября 2019

Я подтверждаю проблему, есть ошибка в этом разделе документации. worker_prefetch_multiplier = 1 будет, как сказано, установить предварительную выборку работника равной 1, означает, что рабочий будет удерживать еще одну задачу в дополнение к той, которая выполняется в данный момент.

Для фактического отключения предварительной выборки также необходимо использоватьtask_acks_late = True вместе с настройкой предварительной выборки, см. этот раздел документов

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...