Получение первого onNext сигнала из списка Mono - PullRequest
0 голосов
/ 03 марта 2019

Учтите, что есть 3 функции, результатом которых является Mono<Int> с.Я пытаюсь получить первый результат, испускаемый любым из Monos.Вот тест для описания того, что я ищу:

fun main() {
  StepVerifier
    .create(firstElement())
    .expectSubscription()
    .expectNext(3)
    .expectComplete()
    .verify()
}

fun firstElement(): Mono<Int> = Flux.concat(_1(), _2(), _3(), _4()).next()

fun _1(): Mono<Int> = 1.toMono().delayElement(Duration.ofMillis(1000))
fun _2(): Mono<Int> = Mono.empty()
fun _3(): Mono<Int> = 3.toMono().delayElement(Duration.ofMillis(500))
fun _4(): Mono<Int> = Mono.error(RuntimeException())

Вопрос в firstElement(), как получить 3, так как он первым испускает элемент.Но, как вы можете видеть, из любого из Monos:

  • Возможно, что любой из них может излучать быстрее, чем остальные
  • Возможно, что любой из них может излучать пустой илиonComplete()
  • Возможно, что любой из них может выдать ошибку или onError()

Я пробовал несколько операторов:

  • Mono.zip {...} требуетсявсе они излучают, потому что возврат Tuple<Int!>
  • Mono.first(...) и Flux.first(...).next() передает onComplete() и / или onError()
  • Flux.concat(...) исключает onComplete() иonError(), но он все еще последовательно подписывается на основе порядка Publisher<T> s

1 Ответ

0 голосов
/ 04 марта 2019

Вы можете продолжить по ошибке с пустым Mono и объединить свои функции

private Mono<Integer> firstElement() {
    return Flux.merge(
            _1().onErrorResume(ignored -> Mono.empty()),
            _2().onErrorResume(ignored -> Mono.empty()),
            _3().onErrorResume(ignored -> Mono.empty()),
            _4().onErrorResume(ignored -> Mono.empty()))
            .next();
}
...