Сельдерей: не добавляйте больше задач, если слишком много в очереди - PullRequest
1 голос
/ 28 марта 2019

У меня есть Flask REST API, который использует Celery для выполнения асинхронных запросов.

Идея состоит в том, что параметр запроса async=1 указывает, что запрос должен обрабатываться асинхронно (немедленный возврат идентификатора задачикоторый клиент будет использовать позже).

В то же время я хочу запретить принимать новые задачи, когда слишком много ожидающих обработки .

Код нижеработает, но accepting_new_tasks() занимает ~ 2 секунды, что слишком медленно.

Есть ли в Celery конфиг (или что-то еще), позволяющий ограничить количество ожидающих задач;или более быстрый способ получить количество ожидающих заданий?

import math

from celery import Celery
from flask import abort, Flask, jsonify, request


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    waiting_tasks = 0
    for reserved in nodes_reserved.values():
        waiting_tasks += len(reserved)

    return waiting_tasks < math.ceil(workers / 3)

1 Ответ

0 голосов
/ 29 марта 2019

В конце концов я решил эту проблему, обратившись к API управления RabbitMQ, указав https://stackoverflow.com/a/27074594/4183498.

import math

from celery import Celery
from flask import abort, Flask, jsonify, request
from requests import get
from requests.auth import HTTPBasicAuth


flask_app = Flask(__name__)
celery_app = Celery("tasks", broker="rabbit...")


def get_workers_count():
    inspector = celery_app.control.inspect()
    nodes_stats = inspector.stats()
    nodes_reserved = inspector.reserved()

    workers = 0
    for stats in nodes_stats.values():
        workers += stats["pool"]["max-concurrency"]

    return workers


WORKERS_COUNT = get_workers_count()


@flask_app.route("/")
def home():
    async_ = request.args.get("async")
    settings = request.args.get("settings")

    if async_:
        if not accepting_new_tasks(celery_app):
            return abort(503)

        task = celery_app.send_task(name="my-task", kwargs={"settings": settings})
        return jsonify({"taskId": task.id})

    return jsonify({})


def accepting_new_tasks(celery_app):WORKERS_COUNT
    auth = HTTPBasicAuth("guest", "guest")
    response = get(
        "http://localhost:15672/api/queues/my_vhost/celery",
         auth=auth
    )
    waiting_tasks = response.json()["messages"]
    return waiting_tasks < math.ceil(WORKERS_COUNT / 3)
...