Создать Flowable из цикла while - PullRequest
0 голосов
/ 27 мая 2019

Я довольно новичок в RxJava, и мне нужно создать репозиторий с несколькими источниками данных.Это сложно для меня, потому что есть несколько небольших подзадач, которые я не знаю, как реализовать с помощью RxJava.

Сначала у меня есть самопишущий дао, который обрабатывает InputStream и предоставляет Items в указанном диапазоне.В настоящее время он просто собирает данные в виде списка, но я хочу предоставить элементы один за другим, используя flowable;В настоящее время он владеет Maybe<List<Item>>.Также несколько ошибок должны быть переданы на более высокий уровень (источник данных).Например, EndOfFile, чтобы уведомить DataSource о том, что данные полностью кэшированы;

Dao.class:

List<Item> loadRange(int start, int number) throws ... {
    ...
    while(...) {
        ...
        //TODO contribute item to flowable
        resultList.add(new Item(...)) 

    }
    return resultList;
}

Maybe<List<Item>> только что созданный метод Maybe.fromCallable();

Пожалуйста, помогите мне!

1 Ответ

1 голос
/ 27 мая 2019

Что-то вроде этого должно работать для этого:

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(emitter -> {
            try {
                while (...){
                    emitter.onNext(new Item());
                }
                emitter.onComplete();
            } catch (IOException e) {
                emitter.onError(e);
            }
        }, BackpressureStrategy.BUFFER);
    }

Я предполагаю, что после завершения цикла вы хотите завершить его, а также отправлять ошибки в нисходящем направлении, а не обрабатывать сигнатуру метода.Также вы можете изменить BackPressureStrategy в соответствии с вашим вариантом использования, например DROP, LATEST и т. Д.

Поскольку вы новичок в RxJava, анонимный класс будет:

Flowable<Item> loadRange(int start, int number) {
        return Flowable.create(new FlowableOnSubscribe<Item>() {
            @Override public void subscribe(FlowableEmitter<Item> emitter) {
                try {
                    while (...){
                        emitter.onNext(new Item());
                    }
                    emitter.onComplete();
                } catch (IOException e) {
                    emitter.onError(e);
                }
            }
        }, BackpressureStrategy.BUFFER);
    }
...