Реактор Mono publi sh для нескольких методов - PullRequest
0 голосов
/ 24 апреля 2020

У меня проблема с публикацией объекта несколькими методами. Упрощенная версия моего кода приведена ниже.

package org.example.reactive;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuple3;

@Slf4j
public class MonoTest {

    public static void main(String... args) {
        MonoTest m = new MonoTest();
        Mono<A> aMono = m.getA();
        Mono<B> bMono = aMono.flatMap(m::getB);
        Mono<C> cMono = aMono.flatMap(m::getC);
        Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD);
        Mono<E> eMono = Mono.zip(aMono, cMono, dMono)
                .flatMap(m::getE);
        aMono
                .zipWith(eMono)
                .subscribe(m::onCompleted, m::onFailed);
    }

    private Mono<A> getA(){
        log.info("inside getA");
        return Mono.just(new A());
    }
    private Mono<B> getB(A a){
        log.info("inside getB");
        return Mono.just(new B());
    }
    private Mono<C> getC(A a){
        log.info("inside getC");
        return Mono.just(new C());
    }
    private Mono<D> getD(Tuple2 t){
        log.info("inside getD");
        return Mono.just(new D());
    }
    private Mono<E> getE(Tuple3 t){
        log.info("inside getE");
        return Mono.just(new E());
    }
    private void onCompleted(Tuple2 t){
        log.info("inside onCompleted");
    }
    private void onFailed(Throwable t){
        log.info("inside onFailed");
    }

    class A {}
    class B {}
    class C {}
    class D {}
    class E {}
}

Я ожидаю вызова каждого метода только один раз. Но get C вызывается дважды. Что здесь не так? Вывод программы следующий:

org.example.reactive.MonoTest - внутри getA

org.example.reactive.MonoTest - внутри get C

org.example. реактивный.MonoTest - внутри get C

org.example.reactive.MonoTest - внутри getB

org.example.reactive.MonoTest - внутри getD

org.example .reactive.MonoTest - внутри getE

org.example.reactive.MonoTest - внутри onCompleted

EDIT

Что ж, я мог бы решить это с помощью кэширования следующим образом.

        Mono<A> aMono = m.getA().cache();
        Mono<B> bMono = aMono.flatMap(m::getB).cache();
        Mono<C> cMono = aMono.flatMap(m::getC).cache();
        Mono<D> dMono = cMono.zipWith(bMono).flatMap(m::getD).cache();

1 Ответ

0 голосов
/ 24 апреля 2020

В вашем наборе Mono s есть два шаблона:

  • aMono является константой и решается один раз с нетерпением из-за прямого присвоения переменной (вы вызываете getA() один раз)
  • , с другой стороны, другие моно вызывают getX() методы внутри операторов, в частности flatMap. Это означает, что эти вызовы выполняются лениво, когда монофоническое отображение подписано на

aMono - это единственный вызов верхнего уровня метода getX(). Замените переменные Mono на их определения, за исключением aMono, и вам станет понятнее, что произойдет:

MonoTest m = new MonoTest();

Mono<A> aMono = m.getA(); // <-- getA log
aMono.zipWith(
    Mono.zip(
       aMono,
       aMono.flatMap(m::getC),  // <-- getC log
       aMono.flatMap(m::getC) // <-- getC log
            .zipWith(aMono.flatMap(m::getB)) // <-- getB log
            .flatMap(m::getD) // <-- getD log
    ).flatMap(m::getE) // <-- getE log
  ).subscribe(...);

, поэтому вы получаете количество и порядок отчетов, о которых вы сообщили.

...