Мне нужно сделать асинхронное сканирование потока объектов.
Если я делаю это с одним (показано ниже), ему быстро не хватает памяти. Кажется, что синглы сохраняют ссылки все время назад, хотя я использую onTerminateDetatch.
object Test extends App {
import io.reactivex.{ Single, Observable, functions }
def identity[T] = new functions.Function[T, T] {
override def apply(t: T): T = t
}
Observable.fromCallable[Array[Int]](() => {
Array.fill(10000000)(100)
})
.repeat()
.scanWith[Single[Array[Int]]](() => Single.just(Array.emptyIntArray), (single, a) => {
single
.map[Array[Int]](_ => a)
.cache()
.onTerminateDetach()
})
.flatMapSingle[Array[Int]](identity[Single[Array[Int]]])
.subscribe(a => println(a.length))
}
Это очень быстро исчерпает память.