планировать запросы на основе ограничения скорости - PullRequest
0 голосов
/ 18 июня 2020

У меня есть очередь, заполненная запросами.

У меня есть потребители, которые принимают запросы из очереди, а затем отправляют их некоторым поставщикам.

Каждый провайдер имеет максимальный лимит запросов он может обрабатывать за минуту.

Я хочу, чтобы все сообщения отправлялись поставщикам.

Однако, если поставщик достиг максимальной мощности запросов в минуту, я хочу, чтобы потребитель подождал, пока можно отправить запрос этому провайдеру, и только в это время -> он отправит сообщение провайдеру.

Посмотрев онлайн, я обнаружил, что могу использовать алгоритм Leaky Bucket для системы ограничения скорости таким образом:

  1. Я никогда не проигнорирую сообщение (как в системе ограничения скорости)

  2. Если очередь заполнена и очередь не находится между минутой с этого момента - это означает, что мы достигаем максимальной пропускной способности запросов в минуту -

    • потребитель рассчитает время, необходимое для запуска, с
    • In установить это время вместо первого элемента (вытащит этот элемент из очереди и вставит сам)
    • подождите, пока он только что рассчитал
    • отправьте сообщение

Это звучит как правильное решение для вас, ребята?

код (здесь java):

import java.time.Duration;
import java.time.Instant;
import java.util.Queue;

public class ConsumerWorker implements  Runnable{

    private Queue<Request> queue;
    private Request request;
    private int limit;
    private int interval;

    public ConsumerWorker(Queue<Request> queue, Request request, int limit, int interval) {
        this.queue = queue;
        this.request = request;
        this.limit = limit;
        this.interval = interval;
    }

    @Override
    public void run() {
        if (queue.size() < limit) {
            request.start = Instant.now();
            queue.offer(request);
            System.out.println("Request id:" + request.id + " processed");
        } else {
            Request head = queue.poll();
            long timeBetweenEarlierRequestAndNow = timeDiff(head.start, Instant.now());

            if (timeBetweenEarlierRequestAndNow >= interval) {
                request.start = Instant.now();
                queue.offer(request);
                System.out.println("Request id:" + request.id + " processed");
            } else {
                long secondsToWait = interval - timeBetweenEarlierRequestAndNow;
                request.start = Instant.now().plusSeconds(secondsToWait);
                queue.offer(request);
                try {
                    Thread.sleep(secondsToWait*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Request id:" + request.id + " processed");
            }
        }
    }


    static long timeDiff(Instant start, Instant end) {
        return Duration.between(start, end).toMillis() / 1000;
    }
}

import java.time.Instant;

public class Request {
    Instant start;
    int id;

    Request(Instant s, int i) {
        start = s;
        id = i;
    }
}
...