Как контролировать параллелизм Flux.flatMap (Mono)? - PullRequest
0 голосов
/ 08 мая 2020

Приведенный ниже код выполняет все веб-запросы (webClient) параллельно, не соблюдая установленный мной предел parallel(5).

        Flux.fromIterable(dataListWithHundredsElements)
            .parallel(5).runOn(Schedulers.boundedElastic())
            .flatMap(element -> 
                webClient.post().
                .bodyValue(element)
                .retrieve()
                .bodyToMono(String.class)
                .doOnError(err -> element.setError(Utils.toString(err)))
                .doOnSuccess(r -> element.setResponse(r))
            )
            .sequential()
            .onErrorContinue((e, v) -> {})
            .doOnComplete(() -> updateInDatabase(dataListWithHundresdElements))
            .subscribe();

Я хотел бы знать, можно ли выполнять запросы в соответствии с значение, указанное в parallel(5), и как это лучше всего сделать?

Одна деталь, этот код является приложением Spring MVC, которое я делаю запросы для внешней службы.

ОБНОВЛЕНИЕ 01

Фактически Flux создает 5 потоков, однако все запросы (WebClient Mono) выполняются одновременно.

Я хочу, чтобы выполнялось 5 запросов за раз, поэтому, когда заканчивается 1 запрос, запускается другой запрос, но никогда не должно быть более 5 запросов параллельно.

Поскольку Mono также является реактивным типом, мне кажется, что 5 потоков of Flux вызывают его и не блокируются, на практике все запросы выполняются параллельно.

ОБНОВЛЕНИЕ 02 - Журналы внешних служб

Это журнал внешний сервер Vice, на который требуется около 5 секунд. Как вы можете видеть в журналах ниже, 14 запросов одновременно.

2020-05-08 11:53:56.655  INFO 28223 --- [nio-8080-exec-8] EXTERNAL SERVICE LOG {"id": 21} http-nio-8080-exec-8
2020-05-08 11:53:56.655  INFO 28223 --- [nio-8080-exec-7] EXTERNAL SERVICE LOG {"id": 20} http-nio-8080-exec-7
2020-05-08 11:53:56.659  INFO 28223 --- [nio-8080-exec-2] EXTERNAL SERVICE LOG {"id": 27} http-nio-8080-exec-2
2020-05-08 11:53:56.659  INFO 28223 --- [nio-8080-exec-6] EXTERNAL SERVICE LOG {"id": 19} http-nio-8080-exec-6
2020-05-08 11:53:56.659  INFO 28223 --- [io-8080-exec-10] EXTERNAL SERVICE LOG {"id": 23} http-nio-8080-exec-10
2020-05-08 11:53:56.660  INFO 28223 --- [nio-8080-exec-5] EXTERNAL SERVICE LOG {"id": 18} http-nio-8080-exec-5
2020-05-08 11:53:56.660  INFO 28223 --- [nio-8080-exec-9] EXTERNAL SERVICE LOG {"id": 17} http-nio-8080-exec-9
2020-05-08 11:53:56.660  INFO 28223 --- [nio-8080-exec-1] EXTERNAL SERVICE LOG {"id": 29} http-nio-8080-exec-1
2020-05-08 11:53:56.661  INFO 28223 --- [nio-8080-exec-4] EXTERNAL SERVICE LOG {"id": 24} http-nio-8080-exec-4
2020-05-08 11:53:56.666  INFO 28223 --- [io-8080-exec-11] EXTERNAL SERVICE LOG {"id": 25} http-nio-8080-exec-11
2020-05-08 11:53:56.675  INFO 28223 --- [io-8080-exec-13] EXTERNAL SERVICE LOG {"id": 42} http-nio-8080-exec-13
2020-05-08 11:53:56.678  INFO 28223 --- [io-8080-exec-14] EXTERNAL SERVICE LOG {"id": 28} http-nio-8080-exec-14
2020-05-08 11:53:56.680  INFO 28223 --- [io-8080-exec-12] EXTERNAL SERVICE LOG {"id": 26} http-nio-8080-exec-12
2020-05-08 11:53:56.686  INFO 28223 --- [io-8080-exec-15] EXTERNAL SERVICE LOG {"id": 22} http-nio-8080-exec-15

ОБНОВЛЕНИЕ 03 - Журналы Reactor

Укрепление, внешняя служба занимает около 5 секунд на ответ. Однако можно увидеть, что все запросы (14) выполняются почти одновременно.

2020-05-08 11:53:56.051  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.053  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.081  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.081  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.082  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.082  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.093  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.093  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.094  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
2020-05-08 11:53:56.095  INFO 28223 --- [nio-8080-exec-1] reactor.Parallel.RunOn.1                 : request(unbounded)
2020-05-08 11:53:56.110  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onNext(@40ddcd53)
2020-05-08 11:53:56.112  INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1                 : onNext(@200e0819)
2020-05-08 11:53:56.112  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onNext(@3b81eee2)
2020-05-08 11:53:56.113  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onNext(@60af2a4d)
2020-05-08 11:53:56.115  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onNext(@723db553)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onNext(@387743b5)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onNext(@62ed2f8d)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1                 : onNext(@1a40554a)
2020-05-08 11:53:56.442  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onNext(@1bcb696a)
2020-05-08 11:53:56.440  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onNext(@46c98823)
2020-05-08 11:53:56.443  INFO 28223 --- [oundedElastic-3] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.446  INFO 28223 --- [oundedElastic-5] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.442  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onNext(@1c0da4a)
2020-05-08 11:53:56.448  INFO 28223 --- [oundedElastic-2] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.452  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onNext(@14d54d26)
2020-05-08 11:53:56.453  INFO 28223 --- [oundedElastic-4] reactor.Parallel.RunOn.1                 : onComplete()
2020-05-08 11:53:56.490  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onNext(@46e43af)
2020-05-08 11:53:56.492  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onNext(@5ca02355)
2020-05-08 11:53:56.496  INFO 28223 --- [oundedElastic-1] reactor.Parallel.RunOn.1                 : onComplete()

1 Ответ

3 голосов
/ 08 мая 2020

Вы можете использовать метод ParallelFlux#flatMap(Function<? super T,? extends Publisher<? extends R>>, boolean, int) для управления параллелизмом.

Для вашей ситуации это может быть:

        .flatMap(element -> 
            webClient.post().
            .bodyValue(element)
            .retrieve()
            .bodyToMono(String.class)
            .doOnError(err -> element.setError(Utils.toString(err)))
            .doOnSuccess(r -> element.setResponse(r)),
            false, 1
        )

Но на самом деле вам не нужно создавать ParallelFlux . Просто используйте метод Flux#flatMap(Function<? super T,? extends Publisher<? extends V>>, int):

Flux.fromIterable(dataListWithHundredsElements)
        .flatMap(element -> webclient.post()..., 5)
...

Второй аргумент метода flatMap отвечает за параллелизм.

...