Flowable из Cache и других Flowable для DataSource с использованием RxJava - PullRequest
0 голосов
/ 27 мая 2019

Я довольно новичок в RxJava, и мне нужно создать репозиторий с несколькими источниками данных.Это сложно для меня, потому что есть несколько меньших подзадач, которые я не знаю, как реализовать с помощью RxJava.

У меня есть Dao, который предоставляет Flowable<Item> в некотором диапазоне для класса DataSource.Этот источник данных имеет локальный кеш, который может быть аннулирован в любое время.Когда хранилище запрашивает у DataSource некоторый диапазон (который может выходить за пределы DataSourse, границы неизвестны до полного кэширования), это должно вызвать ошибку (или уведомить хранилище другим способом).

Я хочу создать Flowable<Item>метод для источника данных, который будет генерировать элементы из кэша и, если необходимо, объединять их с Flowable<Item> dao.getRange(...), в то же время кэшируя новые элементы, поступающие из dao.Также мне нужно делать вещи с ошибками, исходящими от dao, они должны быть обработаны или преобразованы в ошибки более высокого уровня.

DataSource.class:

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

    final int cacheSize = cache.size();
    final int canLoadFromCache = cacheSize - start;
    final int loadFromDao = amount - canLoadFromCache;

    if (isCorrupted) return Flowable.fromCallable(() -> {
        throw new Exception("CorruptedDatasource");
    });

    Flowable<Item> cacheFlow = null;
    Flowable<Item> daoFlow = null;

    if (canLoadFromCache > 0) {
        cacheFlow = Flowable.fromIterable(
                cache.subList(start, canLoadFromCache)
        );

        daoFlow = dao.getRange(
                uri, 
                cacheSize, //start
                loadFromDao //amount
        );
    } else {
        if (isFullyCached) return Flowable.fromCallable(() -> {
            throw new Exception("OutOfBounds");
        });

        //To not deal with gaps load and cache data between;
        //Or replace it with data structure which can handle for us;
        daoFlow = dao.getRange(
                uri,
                cacheSize,
                start - cacheSize + amount);
        //all these items should be cached;
        //other cached and put downstream;
        //Dao errs should be converted to higher lever exceptions,
        //Or set flags in DataSource;
    }
    // return concatenated flowable;
}

В хранилище более высокого уровня объединяются поступающие данныеиз нескольких источников данных, поэтому должен быть способ канкатинитировать диапазоны, поступающие из нескольких источников, таким образом, если один из них еще не достигнут, следует добавить диапазон из следующего.

Пожалуйста, помогите мне!

1 Ответ

0 голосов
/ 27 мая 2019

Попробуйте concat или concatEager, который объединяет две наблюдаемые. Также doOnNext() или doOnError() могут помочь вам в кешировании и обработке ошибок

List<Item> cache;

Flowable<Item> getRange(int start, int amount) {

    ...
        if (isFullyCached) return Flowable.fromCallable(() -> {
            throw new Exception("OutOfBounds");
        });

        //To not deal with gaps load and cache data between;
        //Or replace it with data structure which can handle for us;
        daoFlow = dao.getRange(
                uri,
                cacheSize,
                start - cacheSize + amount);
        //all these items should be cached;
        //other cached and put downstream;
            .doOnNext(result -> /* insert caching logic here */)
        //Dao errs should be converted to higher lever exceptions,
        //Or set flags in DataSource;
            .doOnError(error -> /* handle error here */)
            .onErrorReturn(/* and/or return some empty item */)
    }
    // return concatenated flowable;
    return cacheFlow.concat(daoFlow);
}
...