Как обрабатывать блокировку вызовов при использовании реактора на сервере с JAX-RS? - PullRequest
1 голос
/ 21 июня 2019

Для обработки HTTP-запросов мы должны выполнять блокирующие вызовы (например, вызовы JDBC) как часть процесса на основе Mono/Flux. Наш текущий план выглядит примерно так:

// I renamed getSomething to processJaxrsHttpRequest
CompletionStage<String> processJaxrsHttpRequest(String input) {
  return Mono.just(input)
      .map(in -> process(in))
      .flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
      .flatMap(str -> asyncHttpCall(str))
      .flatMap(str -> Mono.fromCallable(() -> jdbcCall(str)).subscribeOn(fixedScheduler))
      .toFuture();
}

, где fixedScheduler используется одновременно для HTTP-запросов.

Мы надеялись получить некоторую обратную связь об этой стратегии для обработки блочных вызовов в приличном количестве потоков. Конечно, мы понимаем, что если бы все наши запросы проходили через эти блокирующие вызовы, то мы могли бы также не использовать реактор (за пределами общепризнанного API обработки).

Обновление: спасибо bsideup за этот ответ . Тем не менее, я должен был быть немного более конкретным с моими вопросами.

Мой общий вопрос: как эффективно использовать блокирующий вызов для нескольких потоков, если эти потоки можно создавать / подписывать в больших количествах. Мы попробовали предложенный подход , но он привел к взрыву потоков и быстрому OOM. Итак, мы думаем использовать общий планировщик. Итак ... вот мои вопросы.

  1. Используете ли вы общий планировщик (fixedScheduler), что вы предлагаете в описываемой мной ситуации? Если нет, вы указали бы мне какие-либо направления?
  2. Если использование общего планировщика хорошо, это будет хорошей реализацией этого: Schedulers.newParallel("blocking-scheduler", maxNumThreads)?

Обновление 2: просто выкопайте большую цифру на Schedulers#newParallel и поймите, что это не сработает, поскольку она «отклоняет» блокировку вызовов.

Очень ценю любые советы!

1 Ответ

0 голосов
/ 21 июня 2019

Хотя subscribeOn действительно является одним из способов обработки блокировки вызовов, и с вашим использованием все в порядке, вы также можете использовать publishOn.
Он перемещает обработку в предоставленное Scheduler, если не указано другое publishOn в указанном:

CompletionStage<String> getSomething(String input) {
  return Mono.just(input)
      .map(in -> process(in)) // process must be non-blocking, or go after publishOn
      .publishOn(fixedScheduler)
      .map(::jdbcCall)
      .flatMap(str -> asyncHttpCall(str))
      .publishOn(fixedScheduler)
      .map(::jdbcCall)
      .toFuture();
}

Как видите, вы также можете продолжать использовать асинхронные вызовы.Просто убедитесь, что вы не блокируете неблокирующие планировщики (в этом примере я снова использую publishOn после flatMap, поскольку asyncHttpCall может завершиться из неблокирующего планировщика)

...