Как объединить / объединить несколько Mono / Flux, содержащих разные типы данных, без вложенных подписок - PullRequest
0 голосов
/ 24 апреля 2020

Мы используем проект-реактор, чтобы извлечь некоторые данные из внешнего веб-сервиса и сгенерировать несколько результирующих объектов.

Сначала нам нужно получить некоторые мастер-данные, которые необходимы для запуска следующих вызовов веб-сервиса. После того, как мастер-данные станут доступны, мы получим еще несколько данных на основе результатов мастер-данных. Далее нам нужно дождаться, пока все Monos испустят свой результат. Затем мы обрабатываем все данные и строим наш результирующий объект.

У нас нет большого опыта работы с реактивными потоками. Наше решение с вложенными подписками работает, но мы считаем, что может быть лучший способ архивировать то, что мы хотим сделать.

Вопрос 1

Masterdata_A и Masterdata_B могут быть получается параллельно, но как express реагировать без вложенности? Каждый результат getFluxMasterdata_B должен быть объединен с одним результатом getMonoMasterdata_A.

Вопрос 2

Тупель с обеими Masterdatas должен быть каким-то образом ограничен, чтобы не перегружать веб-сервис со многими запросами данных. Фактическая задержка в 1 секунду - это всего лишь предположение, которое, кажется, работает, но было бы лучше определить максимальное количество параллельных выполнений первого внутреннего flatMap, чтобы одновременно иметь максимум N ожидающих вызовов веб-службы.

Вопрос 3

В будущем могут появиться дополнительные данные, которые мы должны получить из веб-службы для построения ProcessingResult. Есть ли лучшая практика для определения реактивного потока, чтобы он был читабельным / понятным? Нормально ли вложение реактивных потоков или его следует избегать (держать все на верхнем уровне)?


DomainModel

    private static class Masterdata_A
    {
        private List<MasterdataRecord_A> records;
    }

    private static class MasterdataRecord_A { /* ... business relevant fields */ }
    private static class MasterdataRecord_B { /* ... business relevant fields */ }
    private static class Data_A { /* ... business relevant fields */ }
    private static class Data_B { /* ... business relevant fields */ }
    private static class Data_C { /* ... business relevant fields */ }

    private static class ProcessingResult { /* ... business relevant fields */ }

WebserviceImpl

    private static class Webservice
    {
        private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
        private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }

        private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
        private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
        private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
    }

BusinessServiceImpl

    public class BusinessService
    {
        public void processData(...params...)
        {
            Webservice webservie = getWebservice();
            // As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
            // to fetch some extra data from the service based on the actual masterdata.
            // For building the ProcessingResult we need access to all data available in the actual context.
            webservice.getMonoMasterdata_A()
                    .subscribe((Masterdata_A masterdataA) -> {
                        webservice.getFluxMasterdata_B()
                                .delayElements(Duration.ofSeconds(1))
                                .flatMap((MasterdataRecord_B masterdataB) -> {
                                    Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
                                    Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
                                    Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
                                    // wait for result of all Monos
                                    return Mono.zip(monoA, monoB, monoC);
                                })
                                .flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
                                    Data_A dataA = data.getT1();
                                    Data_B dataB = data.getT2();
                                    Data_C dataC = data.getT3();

                                    // create result from masterdataA, masterdataB, dataA, dataB, dataC
                                    ProcessingResult result = ...;
                                    return Mono.just(result);
                                })
                                .subscribe(processingResult -> {
                                    // store result to db/filesystem
                                });
                    });
        }
    }

1 Ответ

1 голос
/ 26 апреля 2020

Вопрос 1

Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();

    // suppose that  getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
    // then the result will be [(A,1), (A,2), (A,3),,,]
    // masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)

Вопрос 2

    Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
         // yes that will work just fine for not overwhelming the web services
          // 500 is random value you need to test and tune the optimal value for these services
          .delayElements(Duration.ofMillis(500))
          .flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
                    Mono<Data_A> monoA = webservice.getMonoData_A();
                    Mono<Data_B> monoB = webservice.getMonoData_B();
                    Mono<Data_C> monoC = webservice.getMonoData_C();
                    // wait for result of all Monos
                    return Mono.zip(monoA, monoB, monoC);
                  },
                  // flatmap can take the num of concurrent actions
                  // 5 is random value also u need to test and check the best value for that
                  5)

Вопрос 3

взгляните на это https://github.com/reactor/reactive-streams-commons/issues/21

полный пример

 Mono<Masterdata_A> monoMasterdata_a = webservice.getMonoMasterdata_A();
Flux<MasterdataRecord_B> masterdataRecordBFlux = webservice.getFluxMasterdata_B();

    // suppose that  getMonoMasterdata_A return just "A" and getFluxMasterdata_B reutrn [1,2,3,,,]
    // then the result will be [(A,1), (A,2), (A,3),,,]
    // masterdataAFlux and masterdataRecordBFlux will execute in parallel
Flux.combineLatest(monoMasterdata_a, masterdataRecordBFlux, Tuples::of)
         // yes that will work just fine for not overwhelming the web services
          // 500 is random value you need to test and tune the optimal value for these services
          .delayElements(Duration.ofMillis(500))
          .flatMap((Tuple2<Masterdata_A, MasterdataRecord_B> tuple2) -> {
                    Mono<Data_A> monoA = webservice.getMonoData_A();
                    Mono<Data_B> monoB = webservice.getMonoData_B();
                    Mono<Data_C> monoC = webservice.getMonoData_C();
                    // wait for result of all Monos
                    return Mono.zip(monoA, monoB, monoC);
                  },
                  // flatmap can take the num of concurrent actions
                  // 5 is random value also u need to test and check the best value for that
                  5)
          .map(data -> {
            // for the mapping u don't need flatmap because it's an expensive operation
            // map is the right choice
            Data_A dataA = data.getT1();
            Data_B dataB = data.getT2();
            Data_C dataC = data.getT3();

            // create result from masterdataA, masterdataB, dataA, dataB, dataC
            ProcessingResult result = ???;
            return result;
          })
          // it's always better to save in batch
            // 100 is a random value u should put a value that most suitable for your datasource
          .bufferTimeout(100, Duration.ofMillis(100))
          .concatMap(processingResults -> {
            return batchSave(processingResults)
                    // because batchSave is blocking op
                    .subscribeOn(Schedulers.boundedElastic());
          })
          .subscribe();
...