Завершить работу сельдерея и повторить задачу - PullRequest
1 голос
/ 27 мая 2020

Мне нужно реализовать следующий лог c в задаче сельдерея: если выполняется какое-то условие, выключить текущего рабочего и повторить задачу.

Протестировано на примере задачи:

@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
    try:
        raise Exception('test exection')
    except Exception as exc:
        print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
        app.control.shutdown(destination=[self.request.hostname])  # send shutdown signal to the current worker
        raise self.retry(exc=exc, countdown=5)
    print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
    return 'some result'

Но дает странные результаты, шаги:

  1. Запустить воркер: celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1.
  2. Pu sh задачу в очередь test_queue.
  3. Рабочий поймал и выключил. Я открыл список задач в 'test_queue' в RabbitMQ и увидел:
    • Исходная задача, отправленная издателем, retries = 0 (происходит из вызова app.control.shutdown ());
    • Копировать исходной задачи (с тем же идентификатором), retries = 1 (происходит от вызова self.retry ()).
  4. Затем я запустил другого воркера в ту же очередь, он поймал задачу и выключение тоже. Но на Broker еще одна копия исходной задачи была помещена в очередь с тем же идентификатором и retries = 1. Итак, у меня в очереди было 3 задачи. Все последующие прогоны воркеров ставили + 1 новую задачу в очередь. Условие max_retries = 1 в этом случае не сработало.

Что я пробовал:

  1. Установите task_reject_on_worker_lost = True в celeryconfig.py и выполните ту же задачу. Результат: ничего не изменилось.
  2. Оставить в задаче воркера только вызов выключения. Результат: при каждой попытке возвращается только исходная задача (дублирование задач отсутствует), но повторные попытки не учитываются (всегда установлено значение 0);
  3. Добавить app.control.revoke(self.request.id) перед выключением и повторные вызовы в рабочий (на основе это ). Результат: после первой попытки получилось то же самое (2 задачи в очереди), но когда я запустил второй рабочий, очередь сбрасывалась и ничего не запускалось. Итак, задача потеряна и не повторяется.

Есть ли способ не sh вернуть исходную задачу в очередь во время вызова app.control.shutdown()? Похоже, это причина root. Или не могли бы вы предложить другой обходной путь, который позволит реализовать правильную логику c, указанную выше.

Настройка: RabbitMQ 3.8.2, сельдерей 4.1.0, python 3.5.4

Настройки в celeryconfig.py :

task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True

1 Ответ

1 голос
/ 27 мая 2020

Похоже, проблема task_acks_late в вашем файле конфигурации. Используя это, вы говорите: «Удаляйте задачу из очереди только после того, как я закончу выполнение». Затем вы убиваете рабочего, чтобы он никогда не был подтвержден (и вы получаете дубликаты задачи).

...