Удаление всех нерешенных задач в сельдерее / rabbitmq - PullRequest
167 голосов
/ 22 августа 2011

Как я могу удалить все отложенные задачи, не зная task_id для каждой задачи?

Ответы [ 8 ]

249 голосов
/ 23 августа 2011

Из документов :

$ celery -A proj purge

или

from proj.celery import app
app.control.purge()

(РЕДАКТИРОВАТЬ: Обновлено с текущим методом.)

112 голосов
/ 05 декабря 2013

Для сельдерея 3,0 +:

$ celery purge

Чтобы очистить определенную очередь:

$ celery -Q queue_name purge
23 голосов
/ 04 ноября 2015

Для Celery 2.x и 3.x:

При использовании работника с параметром -Q для определения очередей, например

celery worker -Q queue1,queue2,queue3

, celery purge не будет работать, посколькуВы не можете передать ей параметры очереди.Будет удалена только очередь по умолчанию.Решение состоит в том, чтобы запустить ваших работников с параметром --purge, подобным этому:

celery worker -Q queue1,queue2,queue3 --purge

Это, однако, запустит работника.

Другой вариант - использовать подкоманду amqp из celery

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
10 голосов
/ 03 апреля 2016

в сельдерее 3 +:

CLI:

$ celery -A proj purge

Programatically:

>>> from proj.celery import app
>>> app.control.purge()

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

9 голосов
/ 04 октября 2014

Я обнаружил, что celery purge не работает для моей более сложной конфигурации сельдерея. Я использую несколько именованных очередей для разных целей:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
celery@web01.celery.pidbox                      0   1
celery@web02.celery.pidbox                      0   1
apns                                            0   1
apns@web01.celery.pidbox                        0   1
analytics                                       1   1
analytics@web01.celery.pidbox                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

Первый столбец - это имя очереди, второй - количество сообщений, ожидающих в очереди, а третий - количество прослушивателей этой очереди. Очереди:

  • сельдерей - Очередь для стандартных идемпотентных задач с сельдереем
  • apns - очередь задач Apple Push Notification Service, не совсем идемпотентная
  • analytics - Очередь для продолжительной ночной аналитики
  • *. Pidbox - очередь для рабочих команд, таких как выключение и сброс, по одной на одного работника (2 сельдерея, один работник apns, один аналитик)
  • bcast. * - Широковещательные очереди для отправки сообщений всем работникам, прослушивающим очередь (а не только первым, кто ее получил)
  • celeryev. * - Очереди событий Celery, для аналитики отчетов о задачах

Задача аналитики - это грубые задачи, которые отлично работали на небольших наборах данных, но теперь на их обработку уходит более 24 часов. Иногда, что-то идет не так, и он застревает в ожидании в базе данных. Его нужно переписать, но до тех пор, пока он не застрянет, я убиваю задачу, очищаю очередь и пытаюсь снова. Я обнаружил «застревание», просматривая количество сообщений для очереди аналитики, которое должно быть 0 (законченная аналитика) или 1 (ожидание завершения аналитики прошлой ночью). 2 или выше - это плохо, и я получаю электронное письмо.

celery purge предлагает удалить задачи из одной из широковещательных очередей, и я не вижу возможности выбрать другую именованную очередь.

Вот мой процесс:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
5 голосов
/ 20 января 2017

в сельдерее 3 +

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Очистить именованную очередь:

 celery -A proj amqp queue.purge <queue name>

Очистка настроенаочередь

celery -A proj purge

Я удалил сообщения, но в очереди еще остались сообщения?Ответ: Задачи подтверждаются (удаляются из очереди), как только они действительно выполняются.После того, как работник получил задачу, до ее фактического выполнения потребуется некоторое время, особенно если много задач уже ожидает выполнения.Сообщения, которые не были подтверждены, удерживаются работником до тех пор, пока он не закроет соединение с посредником (сервером AMQP).Когда это соединение закрыто (например, из-за того, что рабочий был остановлен), брокер будет повторно отправлять задачи следующему доступному рабочему (или тому же рабочему, когда оно будет перезапущено), поэтому для правильной очистки очереди ожидающих задач вынужно остановить всех рабочих, а затем очистить задачи с помощью celery.control.purge ().

Поэтому для очистки всей очереди рабочие должны быть остановлены.

1 голос
/ 29 апреля 2019

сельдерей 4 + команда celery purge для очистки всех настроенных очередей задач

celery -A *APPNAME* purge

программно:

from proj.celery import app
app.control.purge()

все отложенные задачи будут удалены. Справка: celerydoc

1 голос
/ 27 марта 2017

1.Чтобы правильно очистить очередь ожидающих задач, вы должны остановить всех рабочих (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

$ sudo rabbitmqctl stop

или (в случае, если RabbitMQ / брокер сообщений управляется Supervisor):

$ sudo supervisorctl stop all

2. ... и затем удалите задачи из определенной очереди:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Запустите RabbitMQ:

$ sudo rabbitmqctl start

или (в случае, если RabbitMQ управляется Supervisor):

$ sudo supervisorctl start all
...