Простая программа, приведенная ниже, в конечном итоге зависает.
// Kotlin
package com.example
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS
fun main() {
fun incr(n: Int): Single<Int> = Single.just(n + 1)
fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
if (n < max)
incr(n).observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max) }
else
Flowable.empty()
)
numbers(1, 1_000_000).sample(5, SECONDS).blockingForEach(::println)
}
На моем ноутбуке она обычно висит где-то после 23500, пример вывода:
15945
21159
23802
Вопрос на самом деле состоит из двух частей:
- Можно ли рекурсивно сгенерировать поток Rx Java в безопасном для стека виде без зависаний, используя операторы, размещенные в базовой библиотеке?
- Если да, то каков метод для достижения этого?