RxJava2 имеет удаленные данные, переопределяющие локальные данные в Observable - PullRequest
0 голосов
/ 20 ноября 2018

В настоящее время у меня есть метод в классе репозитория, который извлекает данные как из локального кэша, так и из удаленного API.

public Observable<List<Items>> getItemsForUser(String userId {
    return Observable.concatArrayEager(
            getUserItemsLocal(userId), // returns Observable<List<Items>>
            getUserItemsRemote(userId) // returns Observable<List<Items>>
    );
}

В настоящее время метод сначала выбирает локальные данные (которые могут быть устаревшими) ивозвращает его, затем обновляет его свежими данными из удаленного API.

Я хочу изменить реализацию на использование Observable.merge, чтобы, если запрос удаленного API завершился первым, эти данные были показаны первыми.Однако, если я просто использую Observable.merge, я обеспокоен тем, что запрос локальной базы данных может вернуть устаревшие данные, которые затем перезапишут свежие данные с удаленного компьютера.

По сути, я хочу что-то вроде:

public Observable<List<ShoutContent>> getItemsForUser(String userId, ErrorCallback errorCallback) {
    return Observable.merge(
            getUserItemsRemote(userId),
            getUserItemsLocal(userId)
                .useOnlyIfFirstResponse()
}

Так что, если сначала выполняется удаленный запрос API, то этот ответ является единственным, который возвращается.Но если локальный запрос завершается первым, я хочу вернуть его, а затем вернуть удаленный запрос после его завершения.Есть ли в RxJava что-то подобное встроенное?

Редактировать: Я хотел бы добавить, что getUserItemsRemote обновляет локальную базу данных, когда Observable испускает, но я не думаю, что смогу гарантировать, что база данных будетобновляется до завершения локального запроса, что оставляет вероятность того, что локальный запрос ответит устаревшими данными.

1 Ответ

0 голосов
/ 20 ноября 2018

Вы можете использовать оператор takeUntil.

takeUntil возвращает Observable, который испускает элементы, испускаемые источником Observable, пока второй ObservableSource не испустит элемент.

В вашем случае вам необходимо прекратить наблюдение локальной наблюдаемой, как только удаленная наблюдаемая будет выпущена.Код показан ниже.

public Observable<String> getUserItemsLocal() {
    return Observable.just("Local db response")
            .delay(5, TimeUnit.SECONDS);  // assume local db takes 5 seconds to emit
}


public Observable<String> getUserItemsRemote() {
    return Observable.just("Remote Data")
            .delay(1, TimeUnit.SECONDS); // remote data comes quicker, in 1 second
}

Ваш код хранилища выглядит как

Observable<String> remoteResponse = getUserItemsRemote();
getUserItemsLocal().takeUntil(remoteResponse)
            .mergeWith(remoteResponse)
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "result: " + s);
                }
            });         
...