Проблема производительности в Flux, элементы которого обогащены другим набором Flux - PullRequest
0 голосов
/ 19 октября 2019

Предполагается, что существует стороннее хранилище, которое возвращает Mono<Tuple2<String, Flux<MyObject>>> из ThirdPartyRepository.findById(batchId)

Каков наилучший способ обогащения MyObject?

    // a bit of descriptive code
    public interface MyObjectProcessor {

        public Mono<Tuple2<String, Flux<MyObject>>> getMyObject(Batch batchId)
        MyObject buildMyObject(MyObject obj, List<A> a, List<B> b, List<C> c);
    }

Во время обработки getMyObject, значения A, B и C будут извлечены из потоков service1, service2 and service3. Эти значения используются для обогащения Flux<MyObject>, извлеченного из сторонней библиотеки.

Я быМне нравится манипулировать MyObject реактивным способом, и это текущее состояние кода.

        public Mono<Tuple2<String, Flux<MyObject>>> getMyObject(Batch batchId) {



            return ThirdPartyRepository
                    .findById(batchId)
                    .flatMap(result ->
                            Mono.just(Tuples.of(result.getT1(),
                                    result.getT2()
                                            .flatMap(item -> service1.fetchA(item.getA()).collectList()           //
                                                    .zipWith(service2.fetchB(item.getB(), batchId).collectList()) // <--- block is slow
                                                    .zipWith(service3.fetchC(item.getC(), batchId).collectList()) //
                                                    .map(tuple -> buildMyObject(item, tuple.getT1().getT1(),      //
                                                            tuple.getT1().getT2(),                                //
                                                            tuple.getT2()))                                       //
                                            ))));
        }

Моя проблема заключается в том, что отображение выполняется очень медленно, поскольку все сервисы1, сервис2 и сервис3 имеют задержку. Это приводит к медленной работе всего процессора.

Кажется, что каждый элемент MyObject обрабатывается один за другим на карте потока. Что я могу сделать, чтобы ускорить весь процесс? Вот суть моей проблемы, скажем, Flux<MyObject> содержит 15 элементов, и скажем, service1 занимает 2 секунды для обработки, затем весь процесс занимает около 14 секунд.

Я думал, что использование реактивного ускорит вещинемного, но это не так. Может кто-нибудь сказать, пожалуйста, как я могу улучшить производительность?

...