Rx Java: сканирование с накопителем на основе Single, которое удаляет промежуточные Single - PullRequest
6 голосов
/ 18 июня 2020

Я пишу однонаправленный поток пользовательского интерфейса на основе Rx, где каждое сокращение состояния равно Single. Обычно такие потоки выполняются с scan (им нужно предыдущее состояние), но когда задействовано Single, это немного сложно. Мне удалось заставить его работать следующим образом:

val events = Observable.just("event1", "event2", "event3")
val initialState = Single.just(emptyList<String>())
// given a current state produces next state's Single
val reducer = { currentState: List<String>, event: String ->
  Single.fromCallable { /* do work */ currentState.plus(event) }
}

events
  .scan(
    initialState,
    { currentStateSingle, event ->
      val nextStateSingle = currentStateSingle
        .flatMap { curState -> reducer(curState, event) }
      // cache is required to avoid resubscription 
      // to all previously emitted single's on each new scan iteration
      nextStateSingle.cache()
    }
  )
  .flatMapSingle { it }
  .subscribe { state -> println("state updated to $state") }

Что меня беспокоит, так это то, что каждое событие (которых может быть много в среде пользовательского интерфейса) будет создавать nextStateSingle.cache() и навсегда добавьте его в существующую цепочку, и все эти Single, когда-либо испущенные, останутся там, неограниченно потребляя память и никогда не удаляются, хотя после того, как они один раз исполнили новое состояние, они вообще не нужны.

Я думал, как это сделать с каким-то использованием switchMap или даже с помощью какой-то внешней переменной atomi c для хранения состояния (вместо сканирования), но я не нашел способа.

Единственный другой вариант, который я вижу, - это написать собственный оператор, который будет подписываться на внутренний сингл, ждать результата, а затем удалять его, но я бы хотел избежать написания специального оператора.

1 Ответ

2 голосов
/ 22 июня 2020

Согласно документации кеша, поскольку вы не можете удалить источник и не можете очистить кешированные значения, вы можете использовать этот обходной путь, который может управлять кешем и очищать измененные значения, забывая все ссылки:

AtomicBoolean shouldStop = new AtomicBoolean();

source.takeUntil(v -> shouldStop.get())
        .onTerminateDetach()
        .cache()
        .takeUntil(v -> shouldStop.get())
        .onTerminateDetach()
        .subscribe(...);

Тогда, возможно, вы сможете сохранить состояние и время от времени обнулять ссылки.

...