Как очистить все задачи определенной очереди с сельдереем в python? - PullRequest
16 голосов
/ 27 октября 2011

Как очистить все запланированные и запущенные задачи определенной очереди с помощью сельдерея в python?Вопросы кажутся довольно простыми, но, чтобы добавить, я не ищу код командной строки

У меня есть следующая строка, которая определяет очередь и хотела бы очистить эту очередь для управления задачами:

CELERY_ROUTES = {"socialreport.tasks.twitter_save": {"queue": "twitter_save"}}

В 1 момент времени я хочу очистить все задачи в очереди twitter_save с помощью кода Python, может быть, с функцией трансляции?Я не мог найти документацию об этом.Это возможно?

Ответы [ 3 ]

34 голосов
/ 06 декабря 2013

просто чтобы обновить ответ @Sam Stoelinga для сельдерея 3.1, теперь это можно сделать так на терминале:

celery amqp queue.purge <QUEUE_NAME>

Для Django обязательно запустите его из файла manage.py:

./manage.py celery amqp queue.purge <QUEUE_NAME> 

Если нет, убедитесь, что сельдерей может правильно указывать брокеру, установив флаг --broker=.

7 голосов
/ 28 июня 2014

Оригинальный ответ не работает для сельдерея 3.1.Обновление Hassek - правильная команда, если вы хотите сделать это из командной строки.Но если вы хотите сделать это программно , сделайте следующее:

Предполагая, что вы запустили приложение Celery как:

celery_app = Celery(...)

Тогда:

import celery.bin.amqp
amqp = celery.bin.amqp.amqp(app = celery_app)
amqp.run('queue.purge', 'name_of_your_queue')

Это удобно для случаев, когда вы поставили в очередь кучу задач, и одна задача сталкивается с фатальным состоянием, которое, как вы знаете, помешает выполнению остальных задач.

Например, вы поставили в очередь кучу задач веб-сканера, и в середине ваших задач IP-адрес вашего сервера блокируется.Нет смысла выполнять остальные задачи.Так что в этом случае ваша задача сама может очистить свою очередь.

6 голосов
/ 30 октября 2011

Лол, это довольно просто, надеюсь, кто-нибудь еще сможет мне помочь.

from celery.bin.camqadm import camqadm
camqadm('queue.purge', queue_name_as_string)

Единственная проблема с этим, мне все еще нужно остановить celeryd перед очисткой очереди, после очистки мне нужно снова запустить celeryd для обработки задач для очереди. Обновлю этот вопрос, если мне удастся.

Мне удалось, но, пожалуйста, исправьте меня, если это не хороший способ остановить сельдерей, очистить очередь и запустить ее снова. Я знаю, что использую термин, потому что на самом деле хочу, чтобы он был прерван.

kill_command =  "ps auxww | grep 'celeryd -n twitter_save' | awk '{print $2}' | xargs kill -9"
subprocess.call(kill_command, shell=True)

camqadm('queue.purge', 'twitter_save')
rerun_command = "/home/samos/Software/virt_env/twittersyncv1/bin/python %s/manage.py celeryd -n twitter_save -l info -Q twitter_save" % settings.PROJECT_ROOT

os.popen(rerun_command+' &')
send_task("socialreport.tasks.twitter_save")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...