Как я могу кэшировать одиночный результат в rxjava2 без нехватки памяти? - PullRequest
0 голосов
/ 17 мая 2019

Мне нужно сделать асинхронное сканирование потока объектов.

Если я делаю это с одним (показано ниже), ему быстро не хватает памяти. Кажется, что синглы сохраняют ссылки все время назад, хотя я использую onTerminateDetatch.

object Test extends App {
  import io.reactivex.{ Single, Observable, functions }
  def identity[T] = new functions.Function[T, T] {
    override def apply(t: T): T = t
  }
  Observable.fromCallable[Array[Int]](() => {
    Array.fill(10000000)(100)
  })
    .repeat()
    .scanWith[Single[Array[Int]]](() => Single.just(Array.emptyIntArray), (single, a) => {
      single
        .map[Array[Int]](_ => a)
        .cache()
        .onTerminateDetach()
    })
    .flatMapSingle[Array[Int]](identity[Single[Array[Int]]])
    .subscribe(a => println(a.length))
}

Это очень быстро исчерпает память.

...