Мне нужно реализовать следующий лог c в задаче сельдерея: если выполняется какое-то условие, выключить текущего рабочего и повторить задачу.
Протестировано на примере задачи:
@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
try:
raise Exception('test exection')
except Exception as exc:
print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
app.control.shutdown(destination=[self.request.hostname]) # send shutdown signal to the current worker
raise self.retry(exc=exc, countdown=5)
print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
return 'some result'
Но дает странные результаты, шаги:
- Запустить воркер:
celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1
. - Pu sh задачу в очередь test_queue.
- Рабочий поймал и выключил. Я открыл список задач в 'test_queue' в RabbitMQ и увидел:
- Исходная задача, отправленная издателем, retries = 0 (происходит из вызова app.control.shutdown ());
- Копировать исходной задачи (с тем же идентификатором), retries = 1 (происходит от вызова self.retry ()).
- Затем я запустил другого воркера в ту же очередь, он поймал задачу и выключение тоже. Но на Broker еще одна копия исходной задачи была помещена в очередь с тем же идентификатором и retries = 1. Итак, у меня в очереди было 3 задачи. Все последующие прогоны воркеров ставили + 1 новую задачу в очередь. Условие max_retries = 1 в этом случае не сработало.
Что я пробовал:
- Установите
task_reject_on_worker_lost = True
в celeryconfig.py и выполните ту же задачу. Результат: ничего не изменилось. - Оставить в задаче воркера только вызов выключения. Результат: при каждой попытке возвращается только исходная задача (дублирование задач отсутствует), но повторные попытки не учитываются (всегда установлено значение 0);
- Добавить
app.control.revoke(self.request.id)
перед выключением и повторные вызовы в рабочий (на основе это ). Результат: после первой попытки получилось то же самое (2 задачи в очереди), но когда я запустил второй рабочий, очередь сбрасывалась и ничего не запускалось. Итак, задача потеряна и не повторяется.
Есть ли способ не sh вернуть исходную задачу в очередь во время вызова app.control.shutdown()
? Похоже, это причина root. Или не могли бы вы предложить другой обходной путь, который позволит реализовать правильную логику c, указанную выше.
Настройка: RabbitMQ 3.8.2, сельдерей 4.1.0, python 3.5.4
Настройки в celeryconfig.py :
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True