У меня есть очередь, заполненная запросами.
У меня есть потребители, которые принимают запросы из очереди, а затем отправляют их некоторым поставщикам.
Каждый провайдер имеет максимальный лимит запросов он может обрабатывать за минуту.
Я хочу, чтобы все сообщения отправлялись поставщикам.
Однако, если поставщик достиг максимальной мощности запросов в минуту, я хочу, чтобы потребитель подождал, пока можно отправить запрос этому провайдеру, и только в это время -> он отправит сообщение провайдеру.
Посмотрев онлайн, я обнаружил, что могу использовать алгоритм Leaky Bucket для системы ограничения скорости таким образом:
Я никогда не проигнорирую сообщение (как в системе ограничения скорости)
Если очередь заполнена и очередь не находится между минутой с этого момента - это означает, что мы достигаем максимальной пропускной способности запросов в минуту -
- потребитель рассчитает время, необходимое для запуска, с
- In установить это время вместо первого элемента (вытащит этот элемент из очереди и вставит сам)
- подождите, пока он только что рассчитал
- отправьте сообщение
Это звучит как правильное решение для вас, ребята?
код (здесь java):
import java.time.Duration;
import java.time.Instant;
import java.util.Queue;
public class ConsumerWorker implements Runnable{
private Queue<Request> queue;
private Request request;
private int limit;
private int interval;
public ConsumerWorker(Queue<Request> queue, Request request, int limit, int interval) {
this.queue = queue;
this.request = request;
this.limit = limit;
this.interval = interval;
}
@Override
public void run() {
if (queue.size() < limit) {
request.start = Instant.now();
queue.offer(request);
System.out.println("Request id:" + request.id + " processed");
} else {
Request head = queue.poll();
long timeBetweenEarlierRequestAndNow = timeDiff(head.start, Instant.now());
if (timeBetweenEarlierRequestAndNow >= interval) {
request.start = Instant.now();
queue.offer(request);
System.out.println("Request id:" + request.id + " processed");
} else {
long secondsToWait = interval - timeBetweenEarlierRequestAndNow;
request.start = Instant.now().plusSeconds(secondsToWait);
queue.offer(request);
try {
Thread.sleep(secondsToWait*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Request id:" + request.id + " processed");
}
}
}
static long timeDiff(Instant start, Instant end) {
return Duration.between(start, end).toMillis() / 1000;
}
}
import java.time.Instant;
public class Request {
Instant start;
int id;
Request(Instant s, int i) {
start = s;
id = i;
}
}