Что я хочу:
- У меня есть
Observable<T>
- Я хочу кешировать это излучаемое значение
- Я хочу кешировать это излучаемое значение только доЯ сбрасываю кеш
- после аннулирования кеша любой подписчик должен автоматически получать новые данные
Idee
Я создаю BehaviourRelay
, чтоподписывается на источник Observable
и кэширует его последнее значение.Всякий раз, когда кэш должен быть признан недействительным, я отменяю подписку на реле от источника и снова подписываю его на него.
Вот что у меня есть:
class RxCache<T>(
private val observable: Observable<T>,
private val tag: String = "RxCache"
) {
private val source: Observable<T>
private var disposable: Disposable? = null
private val relay = BehaviorRelay.create<T>()
init {
// ) 1) create a new hot observable -
// as we will subscribe to it after every reload again
source = observable.share()
.doAfterNext {
L.d(tag, "data loaded")
}
// 2) first reload call
reload()
}
fun reload() {
// 1) unsubscribe from old observable
disposable?.dispose()
disposable = null
// 2) subscribe relay again to reload data
disposable = source.subscribe(relay)
}
fun observe(): Observable<T> {
return relay.hide()
.doAfterNext {
L.d(tag, "data emitted")
}
}
}
Проблема
Я хочу, чтобы relay
подписывался на source
только в том случае, если у него либо есть сам подписчик, либо как только первый подписчик подписался на ретранслятор.Первое легко, но я не знаю, как решить второе безопасным способом.
Есть идеи?Или альтернативные предложения?