Как рекурсивно сгенерировать поток Rx Java, используя операторы из базовой библиотеки? - PullRequest
0 голосов
/ 11 апреля 2020

Простая программа, приведенная ниже, в конечном итоге зависает.

// Kotlin
package com.example

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    fun incr(n: Int): Single<Int> = Single.just(n + 1)

    fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
        if (n < max)
            incr(n).observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max) }
        else
            Flowable.empty()
    )

    numbers(1, 1_000_000).sample(5, SECONDS).blockingForEach(::println)
}

На моем ноутбуке она обычно висит где-то после 23500, пример вывода:

15945
21159
23802

Вопрос на самом деле состоит из двух частей:

  • Можно ли рекурсивно сгенерировать поток Rx Java в безопасном для стека виде без зависаний, используя операторы, размещенные в базовой библиотеке?
  • Если да, то каков метод для достижения этого?

1 Ответ

0 голосов
/ 16 апреля 2020

Потенциальное решение, основанное на предложениях в комментариях:

package com.example

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.processors.UnicastProcessor
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    // doesn't seem to work with PublishProcessor or MulticastProcessor
    fun <T> unfold(seed: T, next: (T) -> T?): Flowable<T> =
        UnicastProcessor.create<T>().toSerialized().let { proc ->
            proc
                .startWithItem(seed)
                .doOnNext { prev ->
                    when (val curr = next(prev)) {
                        null ->
                            proc.onComplete()
                        else ->
                            proc.onNext(curr)
                    }
                }
        }

    fun numbers(first: Int, max: Int): Flowable<Int> =
        unfold(first) { prev -> if (prev < max) prev + 1 else null }

    numbers(1, 1_000_000_000)
        .sample(1, SECONDS)
        .blockingForEach(::println)
}
...