Сельдерей: различные настройки для task_acks_late для рабочего / добавить пользовательскую опцию для сельдерея - PullRequest
1 голос
/ 10 октября 2019

Этот вопрос является продолжением django + celery: отключить предварительную выборку для одного рабочего, есть ли ошибка?

У меня была проблема с сельдереем (см. Вопрос, которому я следуювверх) и для ее решения я хотел бы иметь двух рабочих из сельдерея с -concurrency 1 каждый, но с двумя различными настройками task_acks_late.

Мой текущий подход работает, но, на мой взгляд, не очень красиво. Я делаю следующее:

в settings.py моего проекта django:

CELERY_TASK_ACKS_LATE = os.environ.get("LACK", "False") == "True"

Это позволяет мне запускать работников сельдерея с помощью следующих команд:

LACK=True celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast

Что было бы более интуитивно понятно, если бы я мог сделать что-то вроде:

celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 --late-ack=True
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast --late-ack=False

Я нашел Инициализация разных работников сельдерея с разными значениями , но не понимаю, как встроить это в мойконтекст джанго / сельдерея. В какие файлы мне нужно будет добавить код, который добавляет аргумент в анализатор, и как я могу использовать пользовательский параметр для изменения task_acks_late настроек сельдерея.

Обновление: Спасибо @В ответе Гринева мне удалось добавить пользовательские опции для сельдерея. Однако, похоже, что изменение конфига с помощью этого механизма «приходит слишком поздно», и эта последовательность не принимается во внимание.

1 Ответ

1 голос
/ 14 октября 2019

Одним из возможных решений здесь является предоставление acks_late=True в качестве аргумента декоратора shared_task, учитывая ваш код из предыдущего вопроса:

@shared_task(acks_late=True)
def task_fast(delay=0.1):
    logger.warning("fast in")
    time.sleep(delay)
    logger.warning("fast out")

UPD. У меня нет task_acks_late для установки с использованием этого подхода, но вы можете добавить аргумент командной строки следующим образом.

Вы уже связались с решением. Я не вижу здесь никакой специфики django, просто поместите код parser.add_argument туда, где вы определили свой app, учитывая ваш код из предыдущего вопроса, у вас будет что-то вроде этого:

app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')

def add_worker_arguments(parser):
    parser.add_argument('--late-ack', default=False)

app.user_options['worker'].add(add_worker_arguments)

Тогда вы можете получить доступ к значению аргумента в celeryd_init обработчике сигнала

@celeryd_init.connect
def configure_worker(sender=None, conf=None, options=None, **kwargs):
    conf.task_acks_late = options.get('late-ack') # get custom argument value from options
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...