Как обернуть Flux с помощью операции блокировки в подписке? - PullRequest
0 голосов
/ 29 августа 2018

В документации написано, что вы должны заключить код блокировки в Mono: http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

Но не написано, как на самом деле это сделать.

У меня есть следующий код:

@PostMapping(path = "some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doeSomething(@Valid @RequestBody Flux<Something> something) {
    something.subscribe(something -> {
        // some blocking operation
    });

    // how to return Mono<Void> here?
}

Первая проблема, с которой я столкнулся, заключается в том, что мне нужно что-то вернуть, но я не могу. Если бы я, например, вернул Mono.empty, запрос был бы закрыт до того, как работа потока завершена.

Вторая проблема: как мне на самом деле обернуть блокирующий код, как это предлагается в документации:

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic()); 

1 Ответ

0 голосов
/ 29 августа 2018

Вы не должны вызывать subscribe в обработчике контроллера, а просто построить реактивный конвейер и вернуть его. В конечном счете, HTTP-клиент будет запрашивать данные (через механизм Spring WebFlux), и именно это подписывает и запрашивает данные в конвейере.

Подписка вручную отсоединит обработку запроса от этой другой операции, что 1) снимет любую гарантию порядка операций и 2) прервет обработку, если эта другая операция использует ресурсы HTTP (например, тело запроса).

В этом случае источник не блокирует, а выполняет только операцию преобразования. Поэтому нам лучше использовать publishOn, чтобы сигнализировать, что остальная часть цепочки должна быть выполнена на определенном планировщике. Если операция здесь связана с вводом / выводом, тогда Scheduler.elastic() - лучший выбор, если он связан с процессором, тогда лучше Scheduler.paralell. Вот пример:

@PostMapping(path = "/some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doSomething(@Valid @RequestBody Flux<Something> something) {

    return something.collectList()
      .publishOn(Scheduler.elastic())
      .map(things -> { 
         return processThings(things);
      })
      .then();        
}

public ProcessingResult processThings(List<Something> things) {
  //...
}

Для получения дополнительной информации по этой теме, ознакомьтесь с разделом Планировщик в документации по реактору . Если ваше приложение имеет тенденцию делать много подобных вещей, вы теряете много преимуществ реактивных потоков и можете рассмотреть возможность перехода на модель на основе сервлетов, где вы можете соответствующим образом настроить пулы потоков.

...