Я пытаюсь создать механизм ограничения количества одновременных сетевых запросов. Идея состоит в том, что я хочу иметь фиксированный пул потоков, скажем, из 20 потоков, и использовать этот пул, чтобы разрешить максимум 20 исходящих HTTP-запросов.
То, что я пытался сделать, это:
public class HttpClient {
private final Scheduler scheduler;
public HttpClient(int maxRequests) {
this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
}
public Single<...> request() {
return this.httpRequest()
.subscribeOn(this.scheduler);
}
// sends the http request and returns a response
private Single<...> httpRequest() {
return ...
}
}
Но это не работает. Я попытался установить maxRequests
равным только 1, отправив 5 запросов, а затем установив точку останова на сервере, который получает запросы специально, чтобы первый запрос «застрял» там, чтобы увидеть, если другие 4 ждать доступной темы. Но все 5 из них выполняются, и через некоторое время я просто получаю исключение тайм-аута для всех 5 запросов.
Я тоже пытался использовать observeOn
, но он тоже не работал.
РЕДАКТИРОВАТЬ: Я также попытался реализовать логику семафора c со следующим кодом:
public HttpClient(int maxRequests) {
this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}
public Single<...> request() {
return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
.andThen(this.httpRequest())
.doFinally(concurrentRequestsSemaphore::release);
}
Где Semaphore
- нативная Java реализация семафора. Этот механизм c работал, как и ожидалось, если бы maxRequests
равнялся 2, и я отправил 5 запросов, 2 выдал бы go, а остальные 3 застряли бы внутри fromAction
в ожидании. Но этот подход сопровождался другим неожиданным поведением, таким как тот факт, что даже после того, как 2 запроса получили ответ, ни один из остальных 3 не был выполнен, потому что .doFinally(concurrentRequestsSemaphore::release)
никогда не выполнялся. Я провел несколько тестов, и он выполнялся только после того, как X-запросы получили ответ. И это было совершенно непредсказуемо, что X собирался быть. Таким образом, может быть семафор из 20 разрешений, 20 запросов будут go выдавать и возвращать ответ, и никакие другие не будут выполняться, потому что семафор никогда не освобождается ни одним запросом.