У меня есть проблема и смутное представление о том, как ее исправить, но я постараюсь переоценить контекст, чтобы избежать проблемы XY .
У меня есть асинхронный метод, который немедленно возвращает гуаву ListenableFuture
, которую мне нужно вызывать сотни тысяч или миллионы раз (самому будущему может потребоваться некоторое время для завершения). Я не могу действительно изменить внутренности этого метода. Это связано с серьезным конфликтом ресурсов, поэтому я бы хотел ограничить количество обращений к этому методу за один раз. Поэтому я попытался использовать Semaphore
:
public class ConcurrentCallsLimiter<In, Out> {
private final Function<In, ListenableFuture<Out>> fn;
private final Semaphore semaphore;
private final Executor releasingExecutor;
public ConcurrentCallsLimiter(
int limit, Executor releasingExecutor,
Function<In, ListenableFuture<Out>> fn) {
this.semaphore = new Semaphore(limit);
this.fn = fn;
this.releasingExecutor = releasingExecutor;
}
public ListenableFuture<Out> apply(In in) throws InterruptedException {
semaphore.acquire();
ListenableFuture<Out> result = fn.apply(in);
result.addListener(() -> semaphore.release(), releasingExecutor);
return result;
}
}
Итак, я могу просто обернуть свой вызов в этом классе и вместо этого вызвать:
ConcurrentLimiter<Foo, Bar> cl =
new ConcurrentLimiter(10, executor, someService::turnFooIntoBar);
for (Foo foo : foos) {
ListenableFuture<Bar> bar = cl.apply(foo);
// handle bar (no pun intended)
}
Этот работает . Проблема в том, что задержка хвоста действительно плохая. Некоторые вызовы становятся «неудачными» и в конечном итоге занимают много времени, пытаясь получить ресурсы в рамках этого вызова метода. Это усугубляется некоторой внутренней логикой экспоненциального отката, так что неудачный вызов получает все меньше и меньше шансов на получение необходимых ресурсов по сравнению с новыми вызовами, которые более нетерпеливы и ждут гораздо меньше времени, прежде чем пытаться снова.
Что было бы идеально, чтобы исправить это, если бы было что-то похожее на этот семафор, но у которого было понятие порядка. Например, если лимит равен 10, то 11-й вызов в настоящее время должен ждать любого из первых 10, чтобы завершиться. Мне бы хотелось, чтобы 11-й звонок ожидал завершения самого первого первого вызова. Таким образом, «несчастливые» звонки не будут истощаться новыми поступающими звонками.
Кажется, что я мог бы назначить целочисленный порядковый номер для вызовов и каким-то образом отследить самый низкий из них, который еще не завершен, но я не мог понять, как это сделать, тем более что на самом деле нет никакого полезного "ожидания" методы на AtomicInteger
или как угодно.