Rx Java: пул потоков для сетевых вызовов - PullRequest
0 голосов
/ 25 февраля 2020

Я пытаюсь создать механизм ограничения количества одновременных сетевых запросов. Идея состоит в том, что я хочу иметь фиксированный пул потоков, скажем, из 20 потоков, и использовать этот пул, чтобы разрешить максимум 20 исходящих HTTP-запросов.

То, что я пытался сделать, это:

public class HttpClient {
  private final Scheduler scheduler;

  public HttpClient(int maxRequests) {
    this.scheduler = Schedulers.from(Executors.newFixedThreadPool(maxRequests));
  }

  public Single<...> request() {
    return this.httpRequest()
      .subscribeOn(this.scheduler);
  }

  // sends the http request and returns a response
  private Single<...> httpRequest() {
    return ...
  }
}

Но это не работает. Я попытался установить maxRequests равным только 1, отправив 5 запросов, а затем установив точку останова на сервере, который получает запросы специально, чтобы первый запрос «застрял» там, чтобы увидеть, если другие 4 ждать доступной темы. Но все 5 из них выполняются, и через некоторое время я просто получаю исключение тайм-аута для всех 5 запросов.

Я тоже пытался использовать observeOn, но он тоже не работал.

РЕДАКТИРОВАТЬ: Я также попытался реализовать логику семафора c со следующим кодом:


public HttpClient(int maxRequests) {
  this.concurrentRequestsSemaphore = new Semaphore(maxRequests, true);
}

public Single<...> request() {
  return Completable.fromAction(concurrentRequestsSemaphore::acquireUninterruptibly)
   .andThen(this.httpRequest())
   .doFinally(concurrentRequestsSemaphore::release);
}

Где Semaphore - нативная Java реализация семафора. Этот механизм c работал, как и ожидалось, если бы maxRequests равнялся 2, и я отправил 5 запросов, 2 выдал бы go, а остальные 3 застряли бы внутри fromAction в ожидании. Но этот подход сопровождался другим неожиданным поведением, таким как тот факт, что даже после того, как 2 запроса получили ответ, ни один из остальных 3 не был выполнен, потому что .doFinally(concurrentRequestsSemaphore::release) никогда не выполнялся. Я провел несколько тестов, и он выполнялся только после того, как X-запросы получили ответ. И это было совершенно непредсказуемо, что X собирался быть. Таким образом, может быть семафор из 20 разрешений, 20 запросов будут go выдавать и возвращать ответ, и никакие другие не будут выполняться, потому что семафор никогда не освобождается ни одним запросом.

1 Ответ

1 голос
/ 25 февраля 2020

Вы не показали тело private Single<...> httpRequest(). Я предполагаю, что вы вызываете там какой-то асинхронный метод. Асинхронные методы занимают потоки только для обработки ответа, и когда сам запрос перемещается на сервер и обратно, ни один поток не используется. Это объясняет, почему вы видите все 5 запросов, поступивших на сервер. Обычно, чтобы ограничить количество видов деятельности, используются java .util.concurrent.Semaphore s, но они ограничивают действия, блокируя поток. Логически, поскольку ваша программа асинхронная, вам нужно использовать асинхронный семафор, но это редкий зверь. Таким образом, у вас есть следующие опции:

  • вообще не ограничивают количество асинхронных http-запросов, так как они все равно не занимают много ресурсов
  • запускают специальный поток, который получает разрешения от обычных синхронный семафор, а затем запускает асинхронные HTTP-запросы. Семафор освобождается, когда запрос полностью завершен
  • использовать синхронный запуск http-запросов с фиксированным пулом потоков
  • использовать асинхронный семафор. Единственные известные мне реализации находятся в моей библиотеке DF4J : AsyncSemaphore является расширением стандартного семафора и поэтому имеет как синхронный, так и асинхронный интерфейс, и InpSignal используется только в асинхронных программах. Пример использования InpSignal : AsyncServerSocketChannel. java, где он используется для ограничения количества открытых клиентских подключений в реализации Echo Server.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...