Шаблон репозитория RxJava с несколькими подписчиками и обновлением данных - PullRequest
0 голосов
/ 18 декабря 2018

Я пишу приложение для Android и пытаюсь реализовать шаблон репозитория, но с некоторыми особыми требованиями

1) Я хочу, чтобы подписчики от 1 до N видели изменения в репозитории.данные, если они изменяются

2) Я хочу кешировать данные локально в кеше памяти и дискового кеша, а также иметь сетевой источник данных

3) Я хочу иметь возможность форсироватьобновление из сети на основе значения флага, переданного в

4) Повторяющиеся данные не должны возвращаться абоненту

5) Сетевые ошибки не должны прерывать поток.

Моя текущая реализация показана ниже и, кажется, работает хорошо, однако часть, в которой я не уверен, заключается в том, если это плохая идея подписаться на мою сеть, наблюдаемую в репозитории, как я, просто вызывая метод subscribe ()он и его обновление БД, которая затем распространяет события всем подписчикам.Есть ли лучший способ включить необязательное принудительное обновление сети при сохранении многоадресной рассылки?

Я ранее пытался сделать:

fetchDataFromMemory()
    .concatWith(fetchDataFromDb())
    .flatMap { 
        if (forceRefresh) return@flatmap fetchDataFromNetwork 
        return@flatmap Observable.just(it) 
    }

Однако я столкнулся с бесконечным циклом, где, если forceRefresh был равен true, тогдасеть обновит БД, которая выдаст наблюдаемое, которое вызовет flatMap, которое выполнит обновление сети, которое обновит БД и т. д. до бесконечности.

class Repository {

override fun getAuthToken(): Observable<AuthEntity> {
    // TODO make sure you understand how this works in all cases
    return memory.getAuthEntity()
            .concatWith(authDao.get())
            .firstOrError()
            .toObservable()
}

private fun refreshAccessTokenNonResponse(): Observable<AuthEntity> {
    return getAuthToken()
            .flatMap {
                //                    Log.i("TAG", "kicking off token refresh")
                networkSource
                        .refreshTokenNonResponse(BuildConfig.AUTH_URL + NetworkPaths.AUTH, it.refreshToken)
                        .doOnNext { authEntity ->
                            memory.setAuthEntity(authEntity)
                            authDao.upsertAuth(authEntity)
                        }
            }
}

val requestsCache = ConcurrentHashMap<String, Observable<*>>()

val myObs: Observable<Pair<List<Data>?, Exception?>> = fetchDataFromMemory().concatWith(fetchDataFromDb()).share()

override fun getData(forceRefresh: Boolean): Observable<Pair<List<DataEntity>?, Exception?>> {
    if (forceRefresh && !requestsCache.containsKey(GET_DATA)) {
        fetchDataFromNetwork()
                .subscribeOn(Schedulers.io())
                .subscribe()
    }
    return myObs
            .distinct()
}

fun fetchDataFromMemory(): Observable<Pair<List<Data>?, Exception?>> {
    return memory.getData()
            .map {
                Log.i("TAG", "memory observable map")
                Pair(it, null)
            }
}

fun fetchDataFromDb(): Observable<Pair<List<Data>?, Exception?>> {
    return dataDao
            .getAllData()
            .doOnNext {
                Log.i("TAG", "RETURNING DATA FROM DB")
                memory.setData(it)
            }
            .map { Pair(it, null) as Pair<List<Data>?, Exception?> }
}

fun fetchDataFromNetwork(): Observable<Pair<List<Data>?, Exception?>> {
    return getAuthToken()
            .flatMap {
                Log.i("TAG", "Kicking off network request")
                val netObs: Observable<Pair<Response?, Exception?>> = networkSource
                        .getDataNonResponse(BuildConfig.BASE_URL + NetworkPaths.DATA, "Bearer " + it.accessToken, body)
                        .map { response -> Pair(response, null) as Pair<DataResponse?, Exception?> }
                requestsCache.put(GET_DATA, netObs)
                return@flatMap netObs
            }
            .onErrorResumeNext { throwable: Throwable ->
                Log.i("TAG", "Network Request Error: " + throwable.toString())

                if (throwable is HttpException && throwable.code() == 401) {
                    return@onErrorResumeNext refreshAccessTokenNonResponse()
                            .flatMap {
                                networkSource
                                        .getDataNonResponse(BuildConfig.BASE_URL + NetworkPaths.WORK_ORDER, "Bearer " + it.accessToken, body)
                            }
                            .map { response -> Pair(response, null) as Pair<DataResponse?, Exception?> }
                            .onErrorResumeNext { throwable: Throwable -> Observable.just(Pair(null, Exception(throwable))) }
                }
                return@onErrorResumeNext Observable.just(Pair(null, Exception(throwable)) as Pair<DataResponse?, Exception?>)
            }
            .map {
                Log.i("TAG", "Network Request map")
                Pair(it.first?.team?.data, it.second) as Pair<List<Data>?, Exception?>
            }
            .doOnNext { result: Pair<List<Data>?, Exception?> ->
                Log.i("TAG", "Network Request do on next: " + result.toString())
                requestsCache.remove(GET_DATA)
                if (result.first != null) {
                    dataDao.insertAll(result.first as List<Data>)
                    memory.setData(result.first as List<Data>)
                }
            }
}

}

...