К сожалению, подход с Semaphore
, предложенный @sbordet, не сработал для меня.Я попробовал это:
var semaphore = semaphores.computeIfAbsent(getRequestKey(request), k -> new Semaphore(MAX_CONCURRENT_REQUESTS_NUMBER));
CompletableFuture.runAsync(semaphore::acquireUninterruptibly, WAITING_POOL)
.thenComposeAsync(ignored -> httpClient.sendAsync(request, responseBodyHandler), ASYNC_POOL)
.whenComplete((response, e) -> semaphore.release());
Нет никакой гарантии, что поток соединения освобождается к тому времени, когда выполнение передается следующему CompletableFuture
, где освобождается семафор.Для меня подход работал в случае нормального выполнения, однако, если есть какие-либо исключения, кажется, что поток соединения может быть закрыт после вызова semaphore.release()
.
Наконец, я закончил с использованием OkHttp .Он решает проблему (он просто ожидает освобождения некоторых потоков, если число одновременных потоков достигает max_concurrent_streams
).Он также обрабатывает кадр GOAWAY
.В случае Java HttpClient
мне пришлось реализовать логику повтора, чтобы справиться с этим, поскольку он просто выдает IOException
, если сервер отправляет GOAWAY
frame.