что делает Mono.defer ()? - PullRequest
       23

что делает Mono.defer ()?

3 голосов
/ 02 мая 2019

Я сталкивался с Mono.defer () в каком-то коде Spring Webflux

Я искал метод в документах, но не понимаю объяснения:

"Создать Monoпровайдер, который предоставит целевой Mono для подписки для каждого подписчика нисходящего направления "

, пожалуйста, не могли бы вы получить объяснение и пример.Есть ли место с кучей примеров кода Reactor (их модульные тесты?), На которые я мог бы сослаться.

спасибо

Ответы [ 3 ]

7 голосов
/ 03 мая 2019

Это немного упрощение, но концептуально источники Reactor либо ленивы, либо нетерпеливы.Ожидается, что более продвинутые, такие как HTTP-запрос, будут лениво оцениваться.С другой стороны, стремятся самые простые, такие как Mono.just или Flux.fromIterable.

Под этим я подразумеваю, что вызов Mono.just(System.currentTimeMillis()) немедленно вызовет метод currentTimeMillis() и захватит результат.Указанный результат только испускается Mono после того, как он подписан.Многократная подписка также не меняет значение:

Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0

Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0

Оператор defer призван сделать этот источник ленивым, переоценивая содержимое лямбды каждый раз, когда появляется новыйподписчик :

Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10

Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17
4 голосов
/ 03 мая 2019

с простыми словами, если вы видите на первый взгляд, это похоже на Mono.just (), но это не так.когда вы запускаете Mono.just (), он сразу же создает Observable (Mono) и повторно использует его, но когда вы используете defer, он не создает его сразу, он создает новый Observable при каждой подписке.

Один вариант использования, чтобы увидетьразница

    int a = 5;
@Override
public void run(String... args) throws Exception {

    Mono<Integer> monoJust = Mono.just(a);
    Mono<Integer> monoDefer = Mono.defer(() -> Mono.just(a));

    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));

    a = 7;
    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));
}

печать: 5,5,5,7

, если вы видите, что mono.just сразу же создал наблюдаемое, и оно не изменится, даже если значение изменилось, ноотложите создание наблюдаемого в подписке, чтобы вы работали с текущим значением подписки

0 голосов
/ 27 мая 2019

Я пытался defer для другого варианта использования. Написал код ниже, чтобы проверить и поделиться, так как это может помочь другим. Мой вариант использования состоял в том, чтобы связать два Mono с и убедиться, что первый завершен, прежде чем второй будет занят. А второй содержал блокирующий вызов, чей результат используется для ответа Mono либо empty, либо error ответом. Без defer мой блокирующий вызов выполняется независимо от результата первого. Но при использовании defer блокирующий вызов выполняется только после завершения первого Mono. Код ниже:

public static void main(String[] args) {
    long cur = System.currentTimeMillis();
    boolean succeed = true;

    Mono<Integer> monoJust = Mono.create(consumer -> {
        System.out.println("MonoJust inside " + (System.currentTimeMillis() - cur));
        if (succeed) {
            consumer.success(1);
        } else {
            consumer.error(new RuntimeException("aaa"));
        }
    });

    Mono<String> monoJustStr = Mono.create(consumer -> {
        System.out.println("MonoJustStr inside: " + (System.currentTimeMillis() - cur));
        consumer.success("one");
    });

    System.out.println("##1##: Begin");
    monoJust.then(evaluator() ? Mono.empty() : monoJustStr).subscribe(d -> System.out.println("##1##: "+d), e-> System.err.println(e));
    System.out.println("##1##: Done: "+(System.currentTimeMillis() - cur));

    System.out.println("\n\n\n##2##: Begin");
    monoJust.then(Mono.defer(() -> evaluator() ? Mono.empty() : monoJustStr)).subscribe(d -> System.out.println("##2##: "+d), e-> System.err.println(e));
    System.out.println("##2##: Done: " + (System.currentTimeMillis() - cur));

}

private static boolean evaluator() {
    System.out.println("Inside Evaluator");
    return false;
}

Вывод с succeed=true - Соблюдать последовательность «Inside Evaluator» и «MonoJust inside»

##1##: Begin
Inside Evaluator
MonoJust inside 540
MonoJustStr inside: 542
##1##: one
##1##: Done: 542



##2##: Begin
MonoJust inside 544
Inside Evaluator
MonoJustStr inside: 544
##2##: one
##2##: Done: 544

Ниже выводится succeed = false - Обратите внимание, что оценщик не вызывается.

##1##: Begin
Inside Evaluator
MonoJust inside 565
java.lang.RuntimeException: aaa
##1##: Done: 567



##2##: Begin
MonoJust inside 569
java.lang.RuntimeException: aaa
##2##: Done: 569
...