Получить список задач в очереди в сельдерее - PullRequest
120 голосов
/ 05 апреля 2011

Как я могу получить список задач в очереди, которые еще предстоит обработать?

Ответы [ 12 ]

160 голосов
/ 21 февраля 2012

РЕДАКТИРОВАТЬ: см. Другие ответы для получения списка задач в очереди.

Вы должны посмотреть здесь: Справочник по сельдерею - осмотр рабочих

В основном это:

* * 1010

В зависимости от того, что вы хотите

39 голосов
/ 09 апреля 2015

, если вы используете rabbitMQ, используйте это в терминале:

sudo rabbitmqctl list_queues

выведет список очередей с количеством ожидающих задач.например:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

число в правом столбце - это количество задач в очереди.выше, у очереди сельдерея есть 166 ожидающих задач.

15 голосов
/ 15 апреля 2017

Если вы не используете приоритетные задачи, это на самом деле довольно просто , если вы используете Redis.Чтобы получить значение задачи:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

Но в приоритетных задачах используется другой ключ в redis , поэтому полная картина немного сложнее.Полная картина такова, что вам нужно запросить redis для каждого приоритета задачи.В python (и из проекта Flower) это выглядит так:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

Если вы хотите получить реальное задание, вы можете использовать что-то вроде:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

Оттуда выПридется десериализовать возвращенный список.В моем случае я смог сделать это с помощью чего-то вроде:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

Просто предупреждаю, что десериализация может занять некоторое время, и вам нужно будет настроить указанные выше команды для работы с различными приоритетами.

10 голосов
/ 19 октября 2013

Чтобы получить задачи из бэкэнда, используйте это

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
6 голосов
/ 05 мая 2018

Решение для копирования и вставки Redis с сериализацией JSON:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

Работает с Django.Только не забудьте поменять yourproject.celery.

4 голосов
/ 30 августа 2016

Модуль проверки сельдерея, кажется, знает о задачах только с точки зрения рабочих. Если вы хотите просмотреть сообщения, которые находятся в очереди (которые еще не были извлечены рабочими), я предлагаю использовать pyrabbit , который может взаимодействовать с rabbitmq http api для получения всех видов информации из очереди .

Пример можно найти здесь: Получить длину очереди с помощью сельдерея (RabbitMQ, Django)

3 голосов
/ 13 апреля 2011

Я думаю, что единственный способ получить задачи, которые ждут, - это сохранить список задач, которые вы запустили, и позволить этой задаче удалить себя из списка при запуске.

С rabbitmqctl и list_queues вы можете получитьобзор того, сколько задач ожидает, но не самих задач: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

Если то, что вы хотите, включает в себя задачу, которая обрабатывается, но еще не завершена, вы можете сохранить список ваших задач и проверитьих состояния:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

Или вы позволяете Celery сохранять результаты с помощью CELERY_RESULT_BACKEND и проверять, каких из ваших задач там нет.

2 голосов
/ 11 апреля 2019

Если вы используете Celery + Django самый простой способ проверять задачи, используя команды непосредственно с вашего терминала в вашей виртуальной среде или полный путь к сельдерею :

Документ : http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect inspect
$ celery inspect registered
$ celery inspect scheduled

Также, если вы используете Celery + RabbitMQ , вы можете просмотреть список очередей , используя следующую команду:

Подробнее : https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues
2 голосов
/ 04 мая 2018

Насколько я знаю, Celery не предоставляет API для проверки задач, ожидающих в очереди. Это зависит от брокера. Если вы используете Redis в качестве посредника для примера, то проверка задач, ожидающих в очереди celery (по умолчанию), так же проста:

  1. подключиться к базе данных брокера
  2. элементы списка в списке celery (например, команда LRANGE)

Имейте в виду, что это задачи, ожидающие выбора доступных рабочих. В вашем кластере могут быть запущены некоторые задачи - их не будет в этом списке, поскольку они уже выбраны.

1 голос
/ 16 ноября 2017

Я пришел к выводу, что лучший способ получить количество заданий в очереди - использовать rabbitmqctl, как было предложено здесь несколько раз. Чтобы позволить любому выбранному пользователю запускать команду с sudo, я следовал инструкциям здесь (я пропустил редактирование части профиля, так как не против набрать sudo перед командой.)

Я также взял фрагмент jamesc grep и cut и обернул его в вызовы подпроцесса.

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...