RxJava: предотвращать излучение наблюдаемой до тех пор, пока не будут отправлены данные из другой наблюдаемой - PullRequest
0 голосов
/ 09 ноября 2019

Блок кода ниже предназначен для работы в автономном режиме. Если данные испускаются наблюдаемой памятью, локальная и удаленная наблюдаемая никогда не сработают. Если данные не хранятся в памяти, локальная наблюдаемая попытается прочитать данные из базы данных комнаты, а если все остальное не удастся, тогда удаленная наблюдаемая запрашивает API.

Удаленный источник использует модификацию для отправки запросов и возвращает потоковую информацию, котораязатем преобразуется в наблюдаемую. Однако перед удаленными наблюдаемыми пожарами у меня есть еще одна наблюдаемая, которая возвращает данные о местоположении, необходимые для запроса. Другими словами, дистанционная наблюдаемая зависит от наблюдаемого местоположения. Как я могу использовать RxJava для предотвращения вызова удаленной наблюдаемой в операторе Concat, пока не будут доступны данные о местоположении?

locationObservable = locationSource.getLocationObservable();
memory = source.getSuggestionsFromMemory();
local = source.getSuggestionsFromDisk();


remote = source.getSuggestionsFromNetwork(parameters)
                    .skipUntil(locationObservable);

locationObservable.subscribe(
                    source -> parameters = ParamManager.queryParameters(
                                    source.getLatitude() + "," + source.getLongitude()),

                    error -> Log.println(Log.ERROR, TAG, error.getMessage()
                    )
            );

Observable.concat(memory,local, remote)
            .firstElement()
            .subscribeOn(Schedulers.io())
            .toObservable()
            .observeOn(AndroidSchedulers.mainThread());

удаленная наблюдаемая:

public Observable<List<Venue>> getSuggestionsFromNetwork(HashMap<String, String> parameters){
    return remoteSource.getData(parameters).doOnNext(
            data -> {
                localSource.cacheDataToDisk(data);
                memorySource.cacheDataInMemory(data);
            });
}

удаленный источник:

Observable<List<Venue>> getData(HashMap<String, String> params){
    return Flowable.zip(loadSearchVenues(params), loadTrendingVenues(params),
            loadRecommendedVenues(params), (search, trending, recommended) -> {

                generalVenues = search.getResponse().getSuggestions();
                trendingVenues = trending.getResponse().getSuggestions();
                recommendedVenues = recommended.getResponse().getSuggestions();

                allVenues.addAll(generalVenues);
                allVenues.addAll(trendingVenues);
                allVenues.addAll(recommendedVenues);

                return allVenues;
            }).toObservable();
}

ошибка:

2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/MemorySource: getData() called
2019-11-15 09:18:08.703 29428-29491/com.example.suggest E/LocalSource: getData() called
2019-11-15 09:18:08.767 29428-29428/com.example.suggest E/MainViewModel: Query map was null (parameter #3)
...