Spring boot + webflux: контекст теряется при параллельном выполнении нескольких шагов - PullRequest
4 голосов
/ 10 апреля 2019

Spring boot: 2.1.3.RELEASE

Здравствуйте,

Я пытаюсь использовать контекстную функцию Spring Webflux для переноса простой переменной.У меня есть WebFilter, устанавливающий контекст с такой переменной, и я пытаюсь использовать его в моем контроллере на разных этапах моего потока / потока.В какой-то момент я теряю его после вызова метода «parallel ()» класса Flux.

  • Фильтр:
public class TestFilter implements WebFilter {

    private Logger LOG = LoggerFactory.getLogger(TestFilter.class);

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain.filter(exchange)
            .doOnEach(voidSignal -> System.out.println("filter:"+voidSignal.getContext().getOrEmpty("blob"))).subscriberContext(Context.of("blob", "kapoue"));
    }

}
  • Контроллер:
@RestController
@RequestMapping(TestControllerWebFlux.ROOT)
public class TestControllerWebFlux {

    static final String ROOT = "/flux";
    static final String TEST = "/test";

    private WebClient webClient = WebClient.create();

    @GetMapping(
            value = TEST,
            produces = {MediaType.APPLICATION_JSON_VALUE})
    public Mono<String> test() {
        System.out.println("controller1:"+Thread.currentThread());

        Flux<String> call = webClient.get().uri("http://localhost:" + 8080 + ROOT + "/test2").retrieve().bodyToFlux(Result.class).map(Result::getValue);

        return call.map(s -> s+"0")
            .doOnEach(stringSignal -> System.out.println("controller2:"+stringSignal.getContext().getOrEmpty("blob")))
            .parallel()
            .doOnEach(stringSignal -> System.out.println("controller3:"+stringSignal.getContext().getOrEmpty("blob")))
            .map(s -> s+"0")
            .doOnEach(stringSignal -> System.out.println("controller4:"+stringSignal.getContext().getOrEmpty("blob")))
            .reduce((s, s2) -> s+s2)
            .doOnEach(stringSignal -> System.out.println("controller5:"+stringSignal.getContext().getOrEmpty("blob")))
            .map(s -> {
                System.out.println("controller6:"+Thread.currentThread());
                return s;
            });
    }

    @GetMapping(
        value = "test2",
        produces = {MediaType.APPLICATION_JSON_VALUE})
    public Flux<Result> test2() {
        return Flux.just(new Result("0"), new Result("0"), new Result("0"));
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Result {
        private String value;
    }
}

Все, что я делаю, вызывает конечную точку http://localhost:8080/flux/test/, и я получаю это:

controller1: Thread [реактор-http-nio-2,5, основной] контроллер2: необязательный [kapoue] контроллер3: необязательный. Контроллерty4: необязательный.mpty контроллер2: необязательный [капу] контроллер3: необязательный.: Optional.empty controller2: Необязательный [kapoue] controller3: Optional.empty controller4: Optional.empty controller3: Необязательный .empty controller4: Необязательный.controller5: Необязательный [kapoue] controller6: Поток [реактор-http-nio-2,5, основной] фильтр: Необязательный [kapoue]

Как видите, контекст теряется сразу после 'параллельный метод икак-то возвращается после уменьшения.

Это ошибка или я не должен пытаться запускать вещи параллельно после такого вызова, как этот?

Заранее спасибо за помощь.

1 Ответ

1 голос
/ 10 апреля 2019

Это похоже на ошибку в Reactor.Я сообщил об этом: https://github.com/reactor/reactor-core/issues/1656

...