Как запустить исполнение Mono после завершения другого Mono - PullRequest
0 голосов
/ 04 июня 2018

У меня проблема при попытке выполнить Mono в предложении doFinally.Это мой код.

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> stage.get().doFinally(ignored -> doUnlock(lock)));
}

Проблема в том, что doUnlock(lock) внутри doFinally() возвращает моно, на которое никто не подписан, потому что doFinally не является цепочкой.Поэтому часть асинхронного кода в doUnlock фактически никогда не выполняется.

Есть ли способ избежать этого, используя Mono или Flux помощники?

1 Ответ

0 голосов
/ 04 июня 2018

Использование Mono#then.

К сожалению, вы не можете избежать использования Mono / Flux, если ваш API построен поверх него, однако вы можете взломать эту проблему следующим образом.

Чтобы объединить несколько независимых выполнений, которые должны быть подписаны одно за другим, и результат первого будет возвращен после завершения первого, есть оператор Mono#then, который позволяет писать следующий (подобный обещанию) код:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                );
    }
}

Здесь, чтобы выполнить цепочку выполнения, а затем снять блокировку и затем вернуть промежуточное значение, мы используем flatMap для отображения значения в качестве снятия блокировки и then повторного возврата промежуточного значения.(признайте, звучит неловко)

Примечание , в случае ошибка сигнал терминала, then будет игнорироваться.Таким образом, для достижения поведения try-finally может потребоваться предоставление дополнительного оператора orErrorResume, как показано в следующем примере:

public interface Locks {

    Mono<ReactiveDistributedLock> doLock(LockParams params);

    Mono<Boolean> doUnlock(ReactiveDistributedLock lock);

    default <T> Mono<T> withLock(LockParams params, Supplier<Mono<T>> stage) {
        return doLock(params)
                .flatMap(lock -> 
                    stage.get()
                         .flatMap(value -> 
                            doUnlock(lock)
                            .then(Mono.just(value))
                         )
                         .onErrorResume(t -> 
                            doUnlock(lock)
                            .then(Mono.error(t))
                         )
                );
    }
}
...