Вызов метода регулирования на M запросов за N секунд - PullRequest
123 голосов
/ 10 сентября 2009

Мне нужен компонент / класс, который ограничивает выполнение какого-либо метода до максимального количества M-вызовов за N секунд (или мс или нанос, не имеет значения).

Другими словами, мне нужно убедиться, что мой метод выполняется не более M раз в скользящем окне из N секунд.

Если вы не знаете существующий класс, не стесняйтесь публиковать свои решения / идеи, как бы вы это реализовали.

Ответы [ 13 ]

76 голосов
/ 10 сентября 2009

Я бы использовал кольцевой буфер временных меток с фиксированным размером M. Каждый раз, когда вызывается метод, вы проверяете самую старую запись, и если в прошлом она была меньше N секунд, вы выполнить и добавить еще одну запись, иначе вы спите из-за разницы во времени.

74 голосов
/ 20 июля 2013

Для меня сработало Google Guava RateLimiter .

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);

private void someMethod() {
    throttle.acquire();
    // Do something
}
28 голосов
/ 10 сентября 2009

Конкретно, вы должны быть в состоянии реализовать это с DelayQueue. Инициализируйте очередь с экземплярами M Delayed с задержкой, изначально установленной на ноль. Когда поступают запросы к методу, take токен, который вызывает блокировку метода до тех пор, пока не будет выполнено требование регулирования. Когда токен был взят, add новый токен в очереди с задержкой N.

20 голосов
/ 10 сентября 2009

Читать по алгоритму Token bucket . По сути, у вас есть ведро с жетонами. Каждый раз, когда вы выполняете метод, вы берете токен. Если токенов больше нет, вы блокируете их, пока не получите. Между тем, существует некоторый внешний субъект, который пополняет токены с фиксированным интервалом.

Мне не известна библиотека для этого (или что-то подобное). Вы можете написать эту логику в своем коде или использовать AspectJ для добавления поведения.

5 голосов
/ 03 июня 2017

Если вам нужен ограничитель скорости скользящего окна на основе Java, который будет работать в распределенной системе, вы можете взглянуть на проект https://github.com/mokies/ratelimitj.

Конфигурация с поддержкой Redis, ограничивающая количество запросов по IP до 50 в минуту, будет выглядеть так:

import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;

RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);

boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

См. https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis для получения дополнительной информации о конфигурации Redis.

5 голосов
/ 04 декабря 2012

Это зависит от приложения.

Представьте себе случай, когда несколько потоков хотят, чтобы токен совершил какое-то глобально ограниченное по скорости действие с без всплеска (т.е. вы хотите ограничить 10 действий в 10 секунд, но вы не хотите, чтобы 10 действий происходили в первую секунду, а затем оставались на 9 секунд остановленными).

У DelayedQueue есть недостаток: порядок, в котором потоки запрашивают токены, может не соответствовать порядку, в котором они получают свой запрос. Если несколько потоков заблокированы в ожидании токена, неясно, какой из них возьмет следующий доступный токен. С моей точки зрения, у вас даже могут быть темы, ожидающие вечно.

Одним из решений является минимальный интервал времени между двумя последовательными действиями и выполнение действий в том порядке, в котором они были запрошены.

Вот реализация:

public class LeakyBucket {
    protected float maxRate;
    protected long minTime;
    //holds time of last action (past or future!)
    protected long lastSchedAction = System.currentTimeMillis();

    public LeakyBucket(float maxRate) throws Exception {
        if(maxRate <= 0.0f) {
            throw new Exception("Invalid rate");
        }
        this.maxRate = maxRate;
        this.minTime = (long)(1000.0f / maxRate);
    }

    public void consume() throws InterruptedException {
        long curTime = System.currentTimeMillis();
        long timeLeft;

        //calculate when can we do the action
        synchronized(this) {
            timeLeft = lastSchedAction + minTime - curTime;
            if(timeLeft > 0) {
                lastSchedAction += minTime;
            }
            else {
                lastSchedAction = curTime;
            }
        }

        //If needed, wait for our time
        if(timeLeft <= 0) {
            return;
        }
        else {
            Thread.sleep(timeLeft);
        }
    }
}
3 голосов
/ 10 сентября 2009

Хотя это не то, что вы просили, ThreadPoolExecutor, который предназначен для ограничения M одновременных запросов вместо M запросов в N секунд, также может быть полезным.

2 голосов
/ 24 мая 2012

Я реализовал простой алгоритм регулирования. Попробуйте эту ссылку, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html

Краткое описание алгоритма,

Этот алгоритм использует возможности Java Задержка очереди . Создайте задержанный объект с ожидаемой задержкой (здесь 1000 / M для миллисекунды TimeUnit ). Поместите тот же объект в отложенную очередь, которая будет интерном для нас. Затем перед каждым вызовом метода take объект формирует очередь, take является блокирующим вызовом, который будет возвращаться только после указанной задержки, и после вызова метода не забудьте поместить объект в очередь с обновленным время (здесь текущие миллисекунды).

Здесь мы также можем иметь несколько задержанных объектов с различной задержкой. Этот подход также обеспечит высокую пропускную способность.

1 голос
/ 26 марта 2019

Моя реализация ниже может обрабатывать произвольную точность времени запроса, она имеет O (1) временную сложность для каждого запроса, не требует никакого дополнительного буфера, например O (1) сложность пространства, кроме того, для освобождения токена не требуется фоновый поток, вместо этого токены высвобождаются в соответствии с временем, прошедшим с момента последнего запроса.

class RateLimiter {
    int limit;
    double available;
    long interval;

    long lastTimeStamp;

    RateLimiter(int limit, long interval) {
        this.limit = limit;
        this.interval = interval;

        available = 0;
        lastTimeStamp = System.currentTimeMillis();
    }

    synchronized boolean canAdd() {
        long now = System.currentTimeMillis();
        // more token are released since last request
        available += (now-lastTimeStamp)*1.0/interval*limit; 
        if (available>limit)
            available = limit;

        if (available<1)
            return false;
        else {
            available--;
            lastTimeStamp = now;
            return true;
        }
    }
}
0 голосов
/ 12 декабря 2018

Вот немного продвинутая версия простого ограничителя скорости

/**
 * Simple request limiter based on Thread.sleep method.
 * Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
 * If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
 */
public class RequestRateLimiter {

    private long minTime;

    private long lastSchedAction;
    private double avgSpent = 0;

    ArrayList<RatePeriod> periods;


    @AllArgsConstructor
    public static class RatePeriod{

        @Getter
        private LocalTime start;

        @Getter
        private LocalTime end;

        @Getter
        private float maxRate;
    }


    /**
     * Create request limiter with maxRate - maximum number of requests per second
     * @param maxRate - maximum number of requests per second
     * @return
     */
    public static RequestRateLimiter create(float maxRate){
        return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
                LocalTime.of(23,59,59), maxRate)));
    }

    /**
     * Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
     * @param ratePeriods - rate calendar
     * @return
     */
    public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
        return new RequestRateLimiter(ratePeriods);
    }

    private void checkArgs(List<RatePeriod> ratePeriods){

        for (RatePeriod rp: ratePeriods ){
            if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
                throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
        }
    }

    private float getCurrentRate(){

        LocalTime now = LocalTime.now();

        for (RatePeriod rp: periods){
            if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
                return rp.maxRate;
        }

        return Float.MAX_VALUE;
    }



    private RequestRateLimiter(List<RatePeriod> ratePeriods){

        checkArgs(ratePeriods);
        periods = new ArrayList<>(ratePeriods.size());
        periods.addAll(ratePeriods);

        this.minTime = (long)(1000.0f / getCurrentRate());
        this.lastSchedAction = System.currentTimeMillis() - minTime;
    }

    /**
     * Call this method before making actual request.
     * Method call locks until current rate falls down below the limit
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {

        long timeLeft;

        synchronized(this) {
            long curTime = System.currentTimeMillis();

            minTime = (long)(1000.0f / getCurrentRate());
            timeLeft = lastSchedAction + minTime - curTime;

            long timeSpent = curTime - lastSchedAction + timeLeft;
            avgSpent = (avgSpent + timeSpent) / 2;

            if(timeLeft <= 0) {
                lastSchedAction = curTime;
                return;
            }

            lastSchedAction = curTime + timeLeft;
        }

        Thread.sleep(timeLeft);
    }

    public synchronized float getCuRate(){
        return (float) ( 1000d / avgSpent);
    }
}

И юнит-тесты

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RequestRateLimiterTest {


    @Test(expected = IllegalArgumentException.class)
    public void checkSingleThreadZeroRate(){

        // Zero rate
        RequestRateLimiter limiter = RequestRateLimiter.create(0);
        try {
            limiter.consume();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void checkSingleThreadUnlimitedRate(){

        // Unlimited
        RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) < 1000));
    }

    @Test
    public void rcheckSingleThreadRate(){

        // 3 request per minute
        RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 3; i++ ){

            try {
                limiter.consume();
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
    }



    @Test
    public void checkSingleThreadRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);

        long started = System.currentTimeMillis();
        for ( int i = 0; i < 1000; i++ ){

            try {
                limiter.consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        long ended = System.currentTimeMillis();

        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
    }

    @Test
    public void checkMultiThreadedRateLimit(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreaded32RateLimit(){

        // 0,2 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(8);
        ExecutorService exec = Executors.newFixedThreadPool(8);

        for ( int i = 0; i < 8; i++ ) {

            tasks.add( exec.submit(() -> {
                for (int i1 = 0; i1 < 2; i1++) {

                    try {
                        limiter.consume();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

    @Test
    public void checkMultiThreadedRateLimitDynamicRate(){

        // 100 request per second
        RequestRateLimiter limiter = RequestRateLimiter.create(100);
        long started = System.currentTimeMillis();

        List<Future<?>> tasks = new ArrayList<>(10);
        ExecutorService exec = Executors.newFixedThreadPool(10);

        for ( int i = 0; i < 10; i++ ) {

            tasks.add( exec.submit(() -> {

                Random r = new Random();
                for (int i1 = 0; i1 < 100; i1++) {

                    try {
                        limiter.consume();
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }) );
        }

        tasks.stream().forEach( future -> {
            try {
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

        long ended = System.currentTimeMillis();
        System.out.println( "Current rate:" + limiter.getCurRate() );
        Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
    }

}
...