Определить, доступен ли сельдерей - PullRequest
45 голосов
/ 14 декабря 2011

Я использую Сельдерей для управления асинхронными задачами. Однако иногда процесс сельдерея останавливается, что не приводит к выполнению ни одной из задач. Я хотел бы иметь возможность проверить состояние сельдерея и убедиться, что все работает нормально, и если я обнаружу какие-либо проблемы, отобразить сообщение об ошибке для пользователя. Из документации Celery Worker выглядит, что я мог бы использовать ping или inspect для этого, но ping кажется хакерским, и неясно, как именно должен использоваться inspect (если inspect (). register () пусто?).

Любое руководство по этому вопросу будет оценено. В основном я ищу такой метод:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??

РЕДАКТИРОВАТЬ: Он даже не выглядит как зарегистрированный () доступен на сельдерее 2.3.3 (даже если в документации 2.1 это указано). Может быть, пинг правильный ответ.

РЕДАКТИРОВАТЬ: Пинг также, кажется, не делает то, что я думал, что будет делать, так что до сих пор не уверен, что ответ здесь.

Ответы [ 5 ]

56 голосов
/ 15 декабря 2011

Вот код, который я использовал. celery.task.control.Inspect.stats() возвращает диктовку, содержащую множество подробностей о доступных на данный момент рабочих, None, если нет работающих рабочих, или выдает IOError, если не удается подключиться к брокеру сообщений. Я использую RabbitMQ - возможно, что другие системы обмена сообщениями могут вести себя немного иначе. Это работало в Celery 2.3.x и 2.4.x; Я не уверен, как далеко это идет.

def get_celery_worker_status():
    ERROR_KEY = "ERROR"
    try:
        from celery.task.control import inspect
        insp = inspect()
        d = insp.stats()
        if not d:
            d = { ERROR_KEY: 'No running Celery workers were found.' }
    except IOError as e:
        from errno import errorcode
        msg = "Error connecting to the backend: " + str(e)
        if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
            msg += ' Check that the RabbitMQ server is running.'
        d = { ERROR_KEY: msg }
    except ImportError as e:
        d = { ERROR_KEY: str(e)}
    return d
6 голосов
/ 19 декабря 2018

Из документации по сельдерею 4.2 :

from your_celery_app import app


def get_celery_worker_status():
    i = app.control.inspect()
    stats = i.stats()
    registered_tasks = i.registered()
    active_tasks = i.active()
    scheduled_tasks = i.scheduled()
    result = {
        'stats': stats,
        'registered_tasks': registered_tasks,
        'active_tasks': active_tasks,
        'scheduled_tasks': scheduled_tasks
    }
    return result

конечно, вы можете / должны улучшить код с обработкой ошибок ...

4 голосов
/ 25 мая 2016

Чтобы проверить то же самое с помощью командной строки в случае, если celery запущен как демон,

  • Активируйте virtualenv и перейдите в каталог, где 'app' -
  • Теперь запустите: celery -A [app_name] status
  • Это покажет, если сельдерей поднялся или нет плюс нет. узлов онлайн

Источник: http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

4 голосов
/ 15 марта 2016

У меня сработало следующее:

import socket
from kombu import Connection

celery_broker_url = "amqp://localhost"

try:
    conn = Connection(celery_broker_url)
    conn.ensure_connection(max_retries=3)
except socket.error:
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))
0 голосов
/ 25 марта 2019

Один из способов проверить, отвечает ли любой рабочий, - отправить широковещательную рассылку и вернуть успешный результат при первом ответе.'пинг' и будет ждать ответов до одной секунды.Как только поступит первый ответ, он вернет результат.Если вы хотите, чтобы результат False был быстрее, вы можете добавить аргумент timeout, чтобы сократить время ожидания, прежде чем сдаться.

...