подписка не работает при использовании его в bodyToMono ServerRequest - PullRequest
1 голос
/ 11 октября 2019

Я знаю, что метод subscribeOn используется для переключения потока выполнения при подписке последовательности, но я обнаружил, что он не работает с ServerRequest.bodyToMono / Flux

Что-то вроде

Flux.just(1,2,3)
          .doOnNext(integer -> log.info("test {}",integer))
          .subscribeOn(Schedulers.elastic())
          .subscribe();

приведет к изменению потока выполнения

INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 1
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 2
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 3

Но что меня смутило, так это

Скажем, у меня есть маршрутизатор Spring WebFlux:

@Configuration
public class TestRouter {
    @Bean
    public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
        return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
          route -> route.PUT("/", req -> {
              Mono<String> valueMono = req.bodyToMono(String.class);
              return ServerResponse.ok().body(testService.test(valueMono), String.class);
          }))).build();
    }
}

и служба:

@Service
@Slf4j
public class TestService {
    public Mono<String> test(Mono<String> mono) {
        return mono
          .doOnSubscribe(subscription -> log.info("on subscribe"))
          .subscribeOn(Schedulers.elastic())
          .doOnNext(s -> log.info("received {}", s))
          .subscribeOn(Schedulers.elastic());
    }
}

основная логика - HTTP-запрос на локальный хост: порт / тест будет получать то, что отправляет на сервер в виде обычного текста

Я пытаюсь запустить doOnNext в другом потоке, а не в NIO-потоке Spring WebFlux, где бы я ни помещал

subscribeOn

Тема исполнения всегда будет NIO Тема:

INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : received test

Спасибо @MichaelBerry @ SimonBaslé, вы оба мне очень помогаете, upvote оба ваших ответа

Короче говоря, реактор-нетти переопределяет подписку на http-подписку, используйте flatMap(), чтобы включить отдельный subscribeOn() в другой Mono/Flux или publishOn(), который может выполнять ту работу, которую я хочу

1 Ответ

3 голосов
/ 11 октября 2019

Это не то, что вы можете изменить - это только последний subscribeOn() вызов в цепочке перед вызовом subscribe(), что является должным, поэтому WebFlux может использовать любой планировщик, какой захочет. В этом случае похоже, что он обрабатывает запросы в цикле событий, управляемом NIO или чем-то подобном.

Однако вы можете включить в свою цепочку вызов flatMap(), для которого можно указатьотдельный subscribeOn(), который не будет переопределен. Это может быть вариант в зависимости от вашего варианта использования, так как вы можете выполнить большую часть работы в издателе, определенном в вызове flatMap().

...