Ограничение скорости сельдерея: Можно ли по-разному ограничить задачу сельдерея в зависимости от параметра времени выполнения? - PullRequest
1 голос
/ 07 мая 2019

Я бы хотел ограничить скорость задания Celery на основе определенных параметров, которые определяются во время выполнения. Например: если параметр равен 1, ограничение скорости может быть 100. Если параметр равно 2, ограничение скорости может быть 25. Кроме того, я хотел бы иметь возможность изменять эти ограничения скорости во время выполнения.

Предоставляет ли сельдерей способ сделать это? Я мог бы использовать routing_key для отправки задач в разные очереди в зависимости от параметра, но похоже, что celery не поддерживает ограничение скорости на уровне очереди.

Одним из возможных решений было бы использование eta при постановке в очередь задачи, но мне было интересно, есть ли лучший способ добиться этого.

1 Ответ

0 голосов
/ 08 мая 2019

Celery предоставляет встроенную систему ограничения скорости, но она не работает так, как большинство людей ожидает, что система ограничения скорости будет работать, и у нее есть несколько ограничений.Я реализовал распределенную систему ограничения скорости на основе ETA, как вы упомянули, и некоторых сценариев Lua на Redis, она работала довольно хорошо, поэтому я рекомендую этот подход.

В этой статье подробно описан подход, подобный этому:

https://callhub.io/distributed-rate-limiting-with-redis-and-celery/

Я использовал более простую версию, мой скрипт lua был просто так:

local current_time = tonumber(ARGV[1])
local eta = tonumber(redis.call('get', KEYS[1]))
local interval = tonumber(ARGV[2])

if not eta or eta < current_time then
    redis.call('set', KEYS[1], current_time + interval, 'EX', 10800)
    return nil
else
    redis.call('set', KEYS[1], eta + interval, 'EX', 10800)
    return tostring(eta)
end

И мне пришлось просто переопределить задачу apply_async метод и вызовтот сценарий lua с задержкой, которую я хотел:

def apply_async(self, *args, **kwargs):
    now = int(time.time())

    # From django-redis
    conn = get_redis_connection('default')

    cache_key = 'something'

    eta = conn.eval(self.rate_limit_script, 1, cache_key, now, rate_limiter.get_delay())

    if eta:
        eta = datetime.fromtimestamp(float(eta), tz=timezone.get_current_timezone())
        kwargs['eta'] = eta
    return super().apply_async(*args, **kwargs)
...