Параллельные соединения Webflux почему-то ограничены 256 - PullRequest
0 голосов
/ 30 августа 2018

У меня есть простая настройка сервера и клиента:

Flux.range(1, 5000)
        .subscribeOn(Schedulers.parallel())
        .flatMap(i -> WebClient.create()
            .method(HttpMethod.POST)
            .uri("http://localhost:8080/test")
            .body(Mono.just(String.valueOf(i)), String.class)
            .exchange())
        .publishOn(Schedulers.parallel())
        .subscribe(response ->
            response.bodyToMono(String.class)
                .publishOn(Schedulers.elastic())
                .subscribe(body -> log.info("{}", body)));

вот клиент:

@PostMapping
public Mono<String> test(@RequestBody Mono<String> body) {
    return body.delayElement(Duration.ofSeconds(5));
}

Обе вещи работают на нетти. Может быть, у кого-то есть идея, что вызывает такое поведение?

1 Ответ

0 голосов
/ 30 августа 2018

Это не связано с ограничением WebClient в отношении пулов соединений, но на самом деле это связано с деталями реализации Reactor, которые вы можете изменить.

По умолчанию операторы Reactor, такие как flatMap, имеют prefetch=32 (количество элементов, которые мы запрашиваем до того, как конечный подписчик запросит их), и maxConcurrency=256 (максимальное количество элементов, одновременно обрабатываемых оператором).

Вы можете использовать варианты Flux.flatMap(Function mapper, int concurrency, int prefetch), чтобы изменить это поведение.

Ваш фрагмент кода использует сочетание subscribeOn и publishOn; Я бы сказал, что если вы выполняете реактивную работу ввода-вывода с этим фрагментом кода, вы не должны пытаться планировать работу на упругом / параллельном планировщике. Удаление этих операторов лучше всего здесь.

...