Кеш с аннулированием кеша и отложенной подпиской на источник - PullRequest
0 голосов
/ 27 января 2019

Что я хочу:

  • У меня есть Observable<T>
  • Я хочу кешировать это излучаемое значение
  • Я хочу кешировать это излучаемое значение только доЯ сбрасываю кеш
  • после аннулирования кеша любой подписчик должен автоматически получать новые данные

Idee

Я создаю BehaviourRelay, чтоподписывается на источник Observable и кэширует его последнее значение.Всякий раз, когда кэш должен быть признан недействительным, я отменяю подписку на реле от источника и снова подписываю его на него.

Вот что у меня есть:

class RxCache<T>(
        private val observable: Observable<T>,
        private val tag: String = "RxCache"
) {

    private val source: Observable<T>
    private var disposable: Disposable? = null
    private val relay = BehaviorRelay.create<T>()

    init {
        // ) 1) create a new hot observable - 
        // as we will subscribe to it after every reload again
        source = observable.share()
                .doAfterNext {
                    L.d(tag, "data loaded")
                }
        // 2) first reload call
        reload()
    }

    fun reload() {
        // 1) unsubscribe from old observable
        disposable?.dispose()
        disposable = null
        // 2) subscribe relay again to reload data
        disposable = source.subscribe(relay)
    }

    fun observe(): Observable<T> {
        return relay.hide()
                .doAfterNext {
                    L.d(tag, "data emitted")
                }
    }
}

Проблема

Я хочу, чтобы relay подписывался на source только в том случае, если у него либо есть сам подписчик, либо как только первый подписчик подписался на ретранслятор.Первое легко, но я не знаю, как решить второе безопасным способом.

Есть идеи?Или альтернативные предложения?

Ответы [ 2 ]

0 голосов
/ 31 января 2019

Давайте сделаем низкотехнологичную версию:

Map<T, Observable<T> caches = new ConcurrentHashMap<>();

/* .... */
caches.computeIfAbsent(key, k ->  generateSourceObservable(k).cache())
      .doOnNext(...) // or whatever,  continue your processing pipeline.

Тогда вы сможете просто очистить или удалить ключи из кэша, и существующие реактивные потоки не будут затронуты.

0 голосов
/ 28 января 2019

Как вы уже поняли, невозможно "удалить" последнее значение из BehaviorSubject.Я думаю, что ваш код каратэ с внутренним Observable, переданным через, не является хорошим решением во многих отношениях.Вот код, который может вам помочь:

sealed class CacheItem<T> { // (1)
    class Data<T>(val data: T) : CacheItem<T>()
    class Reset<T> : CacheItem<T>()
}

class RxCache<T> {

    private val behaviorSubject: BehaviorSubject<CacheItem<T>> = BehaviorSubject.create()

    fun reset() {
        behaviorSubject.onNext(CacheItem.Reset()) // (2)
    }

    fun add(newItem: T) {
        behaviorSubject.onNext(CacheItem.Data(newItem)) // (3)
    }

    fun observe() : Observable<T> {
        return behaviorSubject.hide()
            .filter { it != CacheItem.Reset<T>() } // (4)
            .map { (it as CacheItem.Data<T>).data } // (5)
    }
}

Давайте объясним интересные отмеченные части:

  1. Я создал CacheItem, который указывает, содержит ли subject в настоящий момент кэшированные данные или перезагрузка/ сброс был выполнен.
  2. Каждый раз, когда вы хотите «сбросить» тему, вы нажимаете Reset объект, чтобы указать, что нет доступных данных.
  3. Если вы хотите добавить новые кэшированные данные, вы просто нажимаетеданные, завернутые в класс CacheItem.Data, в тему.
  4. Мы хотим игнорировать все значения сброса и получать только испущенные Data элементы.
  5. Наконец мы удаляем оболочку CacheItem.Data и получаем необработанное кэшированноезначение.
...