Вы не должны вызывать subscribe
в обработчике контроллера, а просто построить реактивный конвейер и вернуть его. В конечном счете, HTTP-клиент будет запрашивать данные (через механизм Spring WebFlux), и именно это подписывает и запрашивает данные в конвейере.
Подписка вручную отсоединит обработку запроса от этой другой операции, что 1) снимет любую гарантию порядка операций и 2) прервет обработку, если эта другая операция использует ресурсы HTTP (например, тело запроса).
В этом случае источник не блокирует, а выполняет только операцию преобразования. Поэтому нам лучше использовать publishOn
, чтобы сигнализировать, что остальная часть цепочки должна быть выполнена на определенном планировщике. Если операция здесь связана с вводом / выводом, тогда Scheduler.elastic()
- лучший выбор, если он связан с процессором, тогда лучше Scheduler.paralell
. Вот пример:
@PostMapping(path = "/some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doSomething(@Valid @RequestBody Flux<Something> something) {
return something.collectList()
.publishOn(Scheduler.elastic())
.map(things -> {
return processThings(things);
})
.then();
}
public ProcessingResult processThings(List<Something> things) {
//...
}
Для получения дополнительной информации по этой теме, ознакомьтесь с разделом Планировщик в документации по реактору . Если ваше приложение имеет тенденцию делать много подобных вещей, вы теряете много преимуществ реактивных потоков и можете рассмотреть возможность перехода на модель на основе сервлетов, где вы можете соответствующим образом настроить пулы потоков.