У меня есть две наблюдаемые, использующие RxJava и Retrofit, и наблюдаемая с плоской картой
1) (tmdbApi.discoverymovies) вызывает API и получает список фильмов из определенного набора параметров
2) (addImdbIdToMovieList) принимает список фильмов в качестве параметра и возвращает этот список фильмов с добавленными в него imdbID.
Есть третья функция, которую я хотел бы запустить в фильмах, но яне могу найти где это разместить.Я не могу запустить его, пока с фильмом не связан ImdbId.Если возможно, я бы хотел запускать его для каждого фильма по мере его поступления, но без запуска нескольких элементов списка несколько раз, но при необходимости я могу подождать, пока список не будет завершен.
Куда мне обратиться?что берет в списке фильмов и что-то с этим делает?
private void getMovies(Params params) {
listOMovies = new ArrayList<>();
runTheOtherTwoObservables(params).
subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ArrayList<Movie>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(ArrayList<Movie> movieArrayList) {
listOMovies = movieArrayList;
}
@Override
public void onError(Throwable e) {
Log.e("zzzzz" , "err", e);
}
@Override
public void onComplete() {
Log.d("zzzzz", "running oncomplete");
for (Movie movie : listOMovies) {
Log.d("zzzzz", movie.getTitle() + ": " + movie.getImdbID());
}
}
});
}
private TmdbApi getTmdbApi() {
Retrofit.Builder builder = new Retrofit.Builder()
.baseUrl("https://api.themoviedb.org")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create());
Retrofit retrofit = builder.build();
return retrofit.create(TmdbApi.class);
}
private Observable<ArrayList<Movie>> addImdbIdToMovieList(ArrayList<Movie> listOfMoviesWithoutIds) {
TmdbApi tmdbApi = getTmdbApi();
return Observable.create(emitter -> {
for (Movie movie : listOfMoviesWithoutIds) {
tmdbApi.TMDBdetails(String.valueOf(movie.getTmdbID()), getResources().getString(R.string.tmdb_key), "en-us")
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Movie>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Movie movie) {
String imdbID = movie.getImdbID();
movie.setImdbID(imdbID);
Log.d("zzzzz", movie.getTitle() + " Set To " + movie.getImdbID());
}
@Override
public void onError(Throwable e) {
Log.e("zzzzz", "err", e);
}
@Override
public void onComplete() {
Log.d("zzzzz", "thrrd onComplete");
}
});
}
emitter.onNext(listOfMoviesWithoutIds);
emitter.onComplete();
});
}
public Observable<ArrayList<Movie>> runTheOtherTwoObservables(Params params) {
TmdbApi tmdbApi = getTmdbApi();
return tmdbApi.discoveryMovies(getResources().getString(R.string.tmdb_key),
"en-us",
"popularity.desc",
params.getInclude_adult(),
false,
params.getPrimary_release_date_min(),
params.getPrimary_release_date_max(),
params.getWith_genres())
.flatMap(response -> addImdbIdToMovieList(response.getResults()));
}