Мы используем проект-реактор, чтобы извлечь некоторые данные из внешнего веб-сервиса и сгенерировать несколько результирующих объектов.
Сначала нам нужно получить некоторые мастер-данные, которые необходимы для запуска следующих вызовов веб-сервиса. После того, как мастер-данные станут доступны, мы получим еще несколько данных на основе результатов мастер-данных. Далее нам нужно дождаться, пока все Monos испустят свой результат. Затем мы обрабатываем все данные и строим наш результирующий объект.
У нас нет большого опыта работы с реактивными потоками. Наше решение с вложенными подписками работает, но мы считаем, что может быть лучший способ архивировать то, что мы хотим сделать.
Вопрос 1
Masterdata_A и Masterdata_B могут быть получается параллельно, но как express реагировать без вложенности? Каждый результат getFluxMasterdata_B должен быть объединен с одним результатом getMonoMasterdata_A.
Вопрос 2
Тупель с обеими Masterdatas должен быть каким-то образом ограничен, чтобы не перегружать веб-сервис со многими запросами данных. Фактическая задержка в 1 секунду - это всего лишь предположение, которое, кажется, работает, но было бы лучше определить максимальное количество параллельных выполнений первого внутреннего flatMap, чтобы одновременно иметь максимум N ожидающих вызовов веб-службы.
Вопрос 3
В будущем могут появиться дополнительные данные, которые мы должны получить из веб-службы для построения ProcessingResult. Есть ли лучшая практика для определения реактивного потока, чтобы он был читабельным / понятным? Нормально ли вложение реактивных потоков или его следует избегать (держать все на верхнем уровне)?
DomainModel
private static class Masterdata_A
{
private List<MasterdataRecord_A> records;
}
private static class MasterdataRecord_A { /* ... business relevant fields */ }
private static class MasterdataRecord_B { /* ... business relevant fields */ }
private static class Data_A { /* ... business relevant fields */ }
private static class Data_B { /* ... business relevant fields */ }
private static class Data_C { /* ... business relevant fields */ }
private static class ProcessingResult { /* ... business relevant fields */ }
WebserviceImpl
private static class Webservice
{
private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }
private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
}
BusinessServiceImpl
public class BusinessService
{
public void processData(...params...)
{
Webservice webservie = getWebservice();
// As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
// to fetch some extra data from the service based on the actual masterdata.
// For building the ProcessingResult we need access to all data available in the actual context.
webservice.getMonoMasterdata_A()
.subscribe((Masterdata_A masterdataA) -> {
webservice.getFluxMasterdata_B()
.delayElements(Duration.ofSeconds(1))
.flatMap((MasterdataRecord_B masterdataB) -> {
Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
})
.flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();
// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ...;
return Mono.just(result);
})
.subscribe(processingResult -> {
// store result to db/filesystem
});
});
}
}