Предполагается, что существует стороннее хранилище, которое возвращает Mono<Tuple2<String, Flux<MyObject>>>
из ThirdPartyRepository.findById(batchId)
Каков наилучший способ обогащения MyObject
?
// a bit of descriptive code
public interface MyObjectProcessor {
public Mono<Tuple2<String, Flux<MyObject>>> getMyObject(Batch batchId)
MyObject buildMyObject(MyObject obj, List<A> a, List<B> b, List<C> c);
}
Во время обработки getMyObject
, значения A
, B
и C
будут извлечены из потоков service1, service2 and service3.
Эти значения используются для обогащения Flux<MyObject>
, извлеченного из сторонней библиотеки.
Я быМне нравится манипулировать MyObject
реактивным способом, и это текущее состояние кода.
public Mono<Tuple2<String, Flux<MyObject>>> getMyObject(Batch batchId) {
return ThirdPartyRepository
.findById(batchId)
.flatMap(result ->
Mono.just(Tuples.of(result.getT1(),
result.getT2()
.flatMap(item -> service1.fetchA(item.getA()).collectList() //
.zipWith(service2.fetchB(item.getB(), batchId).collectList()) // <--- block is slow
.zipWith(service3.fetchC(item.getC(), batchId).collectList()) //
.map(tuple -> buildMyObject(item, tuple.getT1().getT1(), //
tuple.getT1().getT2(), //
tuple.getT2())) //
))));
}
Моя проблема заключается в том, что отображение выполняется очень медленно, поскольку все сервисы1, сервис2 и сервис3 имеют задержку. Это приводит к медленной работе всего процессора.
Кажется, что каждый элемент MyObject обрабатывается один за другим на карте потока. Что я могу сделать, чтобы ускорить весь процесс? Вот суть моей проблемы, скажем, Flux<MyObject>
содержит 15 элементов, и скажем, service1 занимает 2 секунды для обработки, затем весь процесс занимает около 14 секунд.
Я думал, что использование реактивного ускорит вещинемного, но это не так. Может кто-нибудь сказать, пожалуйста, как я могу улучшить производительность?