Методы doOnComplete, doOnTerminate или doOnError не вызываются - PullRequest
0 голосов
/ 12 июня 2019

У меня следующая ситуация: мне нужно получить данные из базы данных. Когда я пытаюсь войти в поток пользовательского интерфейса (вот как я это делаю):

List<Movie> movies = new ArrayList<>();

for (MovieEntity movie:movieDao.getFavorites()){
    movies.add(getMovie(movie));
}
mView.onMoviesLoaded(movies);

Все работает правильно

Но когда я делаю это в rx:

mCompositeDisposable.add(movieDao.getFavorites()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnSubscribe(disposable -> mView.showLoadingIndicator(true))
                .doOnError(throwable -> {
                    mView.showLoadingIndicator(false);
                    mView.showEmptyState(true);
                })
                .doOnTerminate(() -> mView.showLoadingIndicator(false))
                .doOnComplete(()->mView.showLoadingIndicator(false))
                .flatMapIterable(listObservable -> listObservable)
                .map(this::getMovie)
                .toList()
                .subscribe(list -> {
                    if (list.isEmpty()) {
                        mView.showEmptyState(true);
                    } else {
                        mView.onMoviesLoaded(list);
                    }
                })
        );

У меня постоянно есть индикатор загрузки. Я не могу понять, почему это происходит, поскольку я добавил showLoadingIndicator(false) для всех случаев (ошибки и успешные случаи). Итак, почему он не уходит и данные не отображаются?

1 Ответ

1 голос
/ 13 июня 2019

movieDao.getFavorites() - бесконечный поток. Использование .toList() в бесконечном потоке не имеет смысла, поскольку .toList() не излучает, пока не завершится исходный поток.

Есть два варианта:

  1. Сделайте ваш источник конечным

Сделать movieDao.getFavorites() вернуть Single<List<MovieEntity>> вместо Observable<List<MovieEntity>>

или просто поставьте .take(1) после источника.

mCompositeDisposable.add(movieDao.getFavorites()
                .take(1) // This will terminate the stream after it emits first item.
                .subscribeOn(Schedulers.io())
                ...
  1. Или не использовать .toList().

Сохраняйте свой бесконечный поток и продолжайте слушать изменения в базе данных.

mCompositeDisposable.add(movieDao.getFavorites()
                .map(movieEntities -> movieEntities.stream().map(this::getMovie).collect(Collectors.toList()))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(list -> {
                    if (list.isEmpty()) {
                        mView.showEmptyState(true);
                    } else {
                        mView.onMoviesLoaded(list);
                    }
                })
        );
...