Что такое хороший алгоритм ограничения скорости? - PullRequest
141 голосов
/ 20 марта 2009

Я мог бы использовать какой-нибудь псевдокод или, лучше, Python. Я пытаюсь реализовать очередь ограничения скорости для бота Python IRC, и она частично работает, но если кто-то запускает меньше сообщений, чем предел (например, ограничение скорости составляет 5 сообщений в 8 секунд, а человек запускает только 4), и следующий триггер более 8 секунд (например, 16 секунд спустя), бот отправляет сообщение, но очередь заполняется, и бот ждет 8 секунд, даже если это не нужно, так как истек 8-секундный период.

Ответы [ 10 ]

211 голосов
/ 21 марта 2009

Здесь самый простой алгоритм , если вы хотите просто отбрасывать сообщения, когда они приходят слишком быстро (вместо того, чтобы ставить их в очередь, что имеет смысл, поскольку очередь может стать произвольно большой):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

В этом решении нет структур данных, таймеров и т. Д., И оно работает чисто :). Чтобы увидеть это, «надбавка» растет со скоростью не более 5/8 единиц в секунду, то есть не более пяти единиц за восемь секунд. Каждое пересылаемое сообщение удерживает одну единицу, поэтому вы не можете отправлять более пяти сообщений каждые восемь секунд.

Обратите внимание, что rate должно быть целым числом, то есть без ненулевой десятичной части, иначе алгоритм не будет работать правильно (фактическая скорость не будет rate/per). Например. rate=0.5; per=1.0; не работает, потому что allowance никогда не вырастет до 1,0. Но rate=1.0; per=2.0; отлично работает.

43 голосов
/ 20 марта 2009

Используйте этот декоратор @RateLimited (ratepersec) перед вашей функцией, которая ставит в очередь.

По сути, это проверяет, прошла ли 1 / скорость в секундах с прошлого раза, и если нет, ждет оставшуюся часть времени, в противном случае он не ждет. Это эффективно ограничивает вас, чтобы оценить / сек. Декоратор может быть применен к любой функции, которую вы хотите ограничить по скорости.

В вашем случае, если вы хотите максимум 5 сообщений в 8 секунд, используйте @RateLimited (0.625) перед функцией sendToQueue.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)
23 голосов
/ 20 марта 2009

Token Bucket довольно прост в реализации.

Начните с ведра с 5 жетонами.

Каждые 5/8 секунд: если в ведре меньше 5 жетонов, добавьте один.

Каждый раз, когда вы хотите отправить сообщение: если в корзине есть токен ≥1, выньте один токен и отправьте сообщение. В противном случае, подождите / отбросьте сообщение / что угодно.

(очевидно, в реальном коде вы использовали бы целочисленный счетчик вместо реальных токенов, и вы можете оптимизировать каждый шаг 5/8 с помощью сохранения временных меток)


Повторное чтение вопроса: если ограничение скорости полностью сбрасывается каждые 8 ​​секунд, то здесь есть модификация:

Начните с отметки времени, last_send, во время давным-давно (например, в эпоху). Кроме того, начните с того же ведра с 5 токенами.

Применяйте правило каждые 5/8 секунд.

Каждый раз, когда вы отправляете сообщение: во-первых, проверьте, если last_send ≥ 8 секунд назад. Если так, заполните ведро (установите это к 5 жетонам). Во-вторых, если в корзине есть токены, отправьте сообщение (в противном случае отбросьте / подождите / и т.д.). В-третьих, установите last_send на сейчас.

Это должно работать для этого сценария.


На самом деле я написал IRC-бота, используя такую ​​стратегию (первый подход). Это на Perl, а не на Python, но вот некоторый код для иллюстрации:

Первая часть здесь посвящена добавлению токенов в ведро. Вы можете увидеть оптимизацию добавления токенов на основе времени (от 2-й до последней строки), а затем последняя строка ограничивает содержимое сегмента до максимума (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn - это структура данных, которая передается. Это внутри метода, который выполняется регулярно (он рассчитывает, когда в следующий раз ему будет что-то делать, и спит либо столько времени, либо пока он не получит сетевой трафик). Следующая часть метода обрабатывает отправку. Это довольно сложно, потому что сообщения имеют приоритеты, связанные с ними.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

Это первая очередь, которая запускается несмотря ни на что. Даже если это уничтожит нашу связь из-за наводнения. Используется для чрезвычайно важных вещей, например, для ответа на PING сервера. Далее остальные очереди:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Наконец, состояние сегмента сохраняется обратно в структуру данных $ conn (на самом деле чуть позже в методе; сначала он вычисляет, как скоро будет больше работы)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Как видите, реальный код обработки сегмента очень мал - около четырех строк. Остальная часть кода является приоритетной обработкой очереди. У бота есть приоритетные очереди, так что, например, кто-то, общаясь с ним, не может помешать ему выполнять свои важные обязанности по кик / бану.

9 голосов
/ 20 июня 2011

для блокировки обработки до тех пор, пока сообщение не может быть отправлено, таким образом, помещая в очередь дальнейшие сообщения, красивое решение antti также можно изменить следующим образом:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

он просто ждет, пока не будет достаточно разрешения для отправки сообщения. чтобы не начинать с двухкратной ставки, пособие также может быть инициализировано с 0.

2 голосов
/ 20 марта 2009

Сохраните время отправки последних пяти строк. Удерживайте сообщения в очереди до тех пор, пока пятое самое последнее сообщение (если оно существует) не пройдет как минимум 8 секунд в прошлом (с last_five как массивом раз):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()
2 голосов
/ 20 марта 2009

Одним из решений является прикрепление метки времени к каждому элементу очереди и удаление элемента по истечении 8 секунд. Вы можете выполнять эту проверку каждый раз, когда очередь добавляется.

Это работает, только если вы ограничите размер очереди до 5 и откажетесь от любых добавлений, пока очередь заполнена.

1 голос
/ 20 октября 2016

Просто реализация Python кода из принятого ответа.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler
1 голос
/ 10 июня 2013

Если кому-то все еще интересно, я использую этот простой вызываемый класс в сочетании с временным хранилищем значений ключа LRU, чтобы ограничить частоту запросов для IP. Использует deque, но может быть переписан для использования со списком.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")
0 голосов
/ 14 октября 2016

Мне нужен был вариант в Scala. Вот оно:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Вот как это можно использовать:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}
0 голосов
/ 02 июня 2009

Как насчет этого:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}
...