Как конкатировать вложенный поток - PullRequest
1 голос
/ 06 ноября 2019

Как объединить два потока с вложенным? Почему выполнение этого кода никогда не заканчивается?

@Test
fun `concatenating two flux`() {

    val names = listOf("israel", "israel")

    val a = Flux.just("a", "v")
            .flatMap { it.toUpperCase().toMono() }
            .concatWith { names.joinToString(" ").toMono() }

    StepVerifier.create(a).expectNext("A", "V", "israel israel").verifyComplete()
}

, когда у меня есть поток с разделенной переменной, выполнение выполняется как ожидалось

@Test
fun `concatenating two flux`() {

    val names = listOf("israel", "israel")

    val b = names.joinToString(" ").toMono()

    val a = Flux.just("a", "v")
            .flatMap { it.toUpperCase().toMono() }

    val c = a.concatWith(b)

    StepVerifier.create(c.log()).expectNext("A", "V", "israel israel").verifyComplete()
}

1 Ответ

2 голосов
/ 07 ноября 2019

Вам необходимо использовать () целое число {} в concatWith()

// RIGHT!

    Flux.just("a", "v")
                    .flatMap { it.toUpperCase().toMono() }
                    .concatWith ( names.joinToString(" ").toMono() )

// WRONG!

    Flux.just("a", "v")
                    .flatMap { it.toUpperCase().toMono() }
                    .concatWith { names.joinToString(" ").toMono() }

Большинство методов Rx2 принимают действительные лямбда-выражения, некоторые методы принимают Callable<ObservableSource<T>> вместо ObservableSource,другие берут Function<T, ObservableSource<R>>.

Observable.defer { Observable.just(1) } - это будет работать нормально.

или observable.flatMap { Observable.just(1) } - тоже будет работать как положено (если вы намеренно игнорируете входящий параметр).

И третий - это тот факт, что мы привыкли к Rx1, который всегда брал Observable в своем методе andThen(), который не может быть представлен как лямбда, поэтому нам нужно вместо этого использовать ()из {}

...