Я пишу приложение для Android и пытаюсь реализовать шаблон репозитория, но с некоторыми особыми требованиями
1) Я хочу, чтобы подписчики от 1 до N видели изменения в репозитории.данные, если они изменяются
2) Я хочу кешировать данные локально в кеше памяти и дискового кеша, а также иметь сетевой источник данных
3) Я хочу иметь возможность форсироватьобновление из сети на основе значения флага, переданного в
4) Повторяющиеся данные не должны возвращаться абоненту
5) Сетевые ошибки не должны прерывать поток.
Моя текущая реализация показана ниже и, кажется, работает хорошо, однако часть, в которой я не уверен, заключается в том, если это плохая идея подписаться на мою сеть, наблюдаемую в репозитории, как я, просто вызывая метод subscribe ()он и его обновление БД, которая затем распространяет события всем подписчикам.Есть ли лучший способ включить необязательное принудительное обновление сети при сохранении многоадресной рассылки?
Я ранее пытался сделать:
fetchDataFromMemory()
.concatWith(fetchDataFromDb())
.flatMap {
if (forceRefresh) return@flatmap fetchDataFromNetwork
return@flatmap Observable.just(it)
}
Однако я столкнулся с бесконечным циклом, где, если forceRefresh был равен true, тогдасеть обновит БД, которая выдаст наблюдаемое, которое вызовет flatMap, которое выполнит обновление сети, которое обновит БД и т. д. до бесконечности.
class Repository {
override fun getAuthToken(): Observable<AuthEntity> {
// TODO make sure you understand how this works in all cases
return memory.getAuthEntity()
.concatWith(authDao.get())
.firstOrError()
.toObservable()
}
private fun refreshAccessTokenNonResponse(): Observable<AuthEntity> {
return getAuthToken()
.flatMap {
// Log.i("TAG", "kicking off token refresh")
networkSource
.refreshTokenNonResponse(BuildConfig.AUTH_URL + NetworkPaths.AUTH, it.refreshToken)
.doOnNext { authEntity ->
memory.setAuthEntity(authEntity)
authDao.upsertAuth(authEntity)
}
}
}
val requestsCache = ConcurrentHashMap<String, Observable<*>>()
val myObs: Observable<Pair<List<Data>?, Exception?>> = fetchDataFromMemory().concatWith(fetchDataFromDb()).share()
override fun getData(forceRefresh: Boolean): Observable<Pair<List<DataEntity>?, Exception?>> {
if (forceRefresh && !requestsCache.containsKey(GET_DATA)) {
fetchDataFromNetwork()
.subscribeOn(Schedulers.io())
.subscribe()
}
return myObs
.distinct()
}
fun fetchDataFromMemory(): Observable<Pair<List<Data>?, Exception?>> {
return memory.getData()
.map {
Log.i("TAG", "memory observable map")
Pair(it, null)
}
}
fun fetchDataFromDb(): Observable<Pair<List<Data>?, Exception?>> {
return dataDao
.getAllData()
.doOnNext {
Log.i("TAG", "RETURNING DATA FROM DB")
memory.setData(it)
}
.map { Pair(it, null) as Pair<List<Data>?, Exception?> }
}
fun fetchDataFromNetwork(): Observable<Pair<List<Data>?, Exception?>> {
return getAuthToken()
.flatMap {
Log.i("TAG", "Kicking off network request")
val netObs: Observable<Pair<Response?, Exception?>> = networkSource
.getDataNonResponse(BuildConfig.BASE_URL + NetworkPaths.DATA, "Bearer " + it.accessToken, body)
.map { response -> Pair(response, null) as Pair<DataResponse?, Exception?> }
requestsCache.put(GET_DATA, netObs)
return@flatMap netObs
}
.onErrorResumeNext { throwable: Throwable ->
Log.i("TAG", "Network Request Error: " + throwable.toString())
if (throwable is HttpException && throwable.code() == 401) {
return@onErrorResumeNext refreshAccessTokenNonResponse()
.flatMap {
networkSource
.getDataNonResponse(BuildConfig.BASE_URL + NetworkPaths.WORK_ORDER, "Bearer " + it.accessToken, body)
}
.map { response -> Pair(response, null) as Pair<DataResponse?, Exception?> }
.onErrorResumeNext { throwable: Throwable -> Observable.just(Pair(null, Exception(throwable))) }
}
return@onErrorResumeNext Observable.just(Pair(null, Exception(throwable)) as Pair<DataResponse?, Exception?>)
}
.map {
Log.i("TAG", "Network Request map")
Pair(it.first?.team?.data, it.second) as Pair<List<Data>?, Exception?>
}
.doOnNext { result: Pair<List<Data>?, Exception?> ->
Log.i("TAG", "Network Request do on next: " + result.toString())
requestsCache.remove(GET_DATA)
if (result.first != null) {
dataDao.insertAll(result.first as List<Data>)
memory.setData(result.first as List<Data>)
}
}
}
}