Я пишу однонаправленный поток пользовательского интерфейса на основе Rx, где каждое сокращение состояния равно Single
. Обычно такие потоки выполняются с scan
(им нужно предыдущее состояние), но когда задействовано Single
, это немного сложно. Мне удалось заставить его работать следующим образом:
val events = Observable.just("event1", "event2", "event3")
val initialState = Single.just(emptyList<String>())
// given a current state produces next state's Single
val reducer = { currentState: List<String>, event: String ->
Single.fromCallable { /* do work */ currentState.plus(event) }
}
events
.scan(
initialState,
{ currentStateSingle, event ->
val nextStateSingle = currentStateSingle
.flatMap { curState -> reducer(curState, event) }
// cache is required to avoid resubscription
// to all previously emitted single's on each new scan iteration
nextStateSingle.cache()
}
)
.flatMapSingle { it }
.subscribe { state -> println("state updated to $state") }
Что меня беспокоит, так это то, что каждое событие (которых может быть много в среде пользовательского интерфейса) будет создавать nextStateSingle.cache()
и навсегда добавьте его в существующую цепочку, и все эти Single
, когда-либо испущенные, останутся там, неограниченно потребляя память и никогда не удаляются, хотя после того, как они один раз исполнили новое состояние, они вообще не нужны.
Я думал, как это сделать с каким-то использованием switchMap
или даже с помощью какой-то внешней переменной atomi c для хранения состояния (вместо сканирования), но я не нашел способа.
Единственный другой вариант, который я вижу, - это написать собственный оператор, который будет подписываться на внутренний сингл, ждать результата, а затем удалять его, но я бы хотел избежать написания специального оператора.