Как сделать два запроса в одном запросе с помощью rxJava2 - PullRequest
1 голос
/ 25 июня 2019

Я получаю доступ к серверу в моем приложении для Android. Я хочу получить список моих друзей и список запросов на добавление в разные запросы. Они должны прийти в одно и то же время. Затем я хочу показать эти данные на экране.

Я пытался получить данные из двух запросов при использовании flatMap. interactor.getColleagues() и interactor.getTest() возвращает тип данных Observable<List<Colleagues>>

private fun loadColleaguesEmployer() {
        if (disposable?.isDisposed == true) disposable?.dispose()
        //запрос на список друзей
        interactor.getColleagues(view.getIdUser() ?: preferences.userId)
            .subscribeOn(Schedulers.io())
            .flatMap {
                interactor.getTest().subscribeOn(Schedulers.io())
                    .doOnNext {
                            result-> view.showTest(mapper.map(result))
                    }
            }
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeBy(
                onNext = { result1 ->
                    //Обработка списка коллег работодателей
                    view.showColleagues(mapper.map(result1.filter { data -> data.typeFriend == "Работодатель" }))
                },
                onError = { it.printStackTrace() }
            )
    }

Я хочу получать и обрабатывать данные из разных запросов одновременно.

Ответы [ 4 ]

0 голосов
/ 25 июня 2019

Если обе наблюдаемые возвращают один и тот же тип данных, и вы не против смешивать данные обоих источников - рассмотрите возможность использования Observable.merge ()

Например:

 Observable.merge(interactor.getColleagues(), interactor.getTest())
                .subscribeOn(Schedulers.io())
                .subscribe(
                    (n) -> {/*do on next*/ },
                    (e) -> { /*do on error*/ });

Обратите внимание, что .merge() оператор не заботится о порядке выбросов.

0 голосов
/ 25 июня 2019

Объединение наблюдаемых результатов нескольких асинхронных http-запросов с rxjava Observable.zip.

public class Statistics {

    public static void main(String[] args) {

        List<Observable<ObservableHttpResponse>> observableRequests = Arrays.asList(
                Http.getAsync("http://localhost:3001/stream"),
                Http.getAsync("http://localhost:3002/stream"),
                Http.getAsync("http://localhost:3003/stream"),
                Http.getAsync("http://localhost:3004/stream"));

        List<Observable<Stats>> observableStats = observableRequests.stream()
                .map(observableRequest ->
                        observableRequest.flatMap(response ->
                                response.getContent()
                                        .map(new EventStreamJsonMapper<>(Stats.class))))
                .collect(toList());

        Observable<List<Stats>> joinedObservables = Observable.zip(
                observableStats.get(0),
                observableStats.get(1),
                observableStats.get(2),
                observableStats.get(3),
                Arrays::asList);

        // This does not work, as FuncN accepts (Object...) https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/functions/FuncN.java#L19
        // Observable<List<Stats>> joinedObservables = Observable.zip(observableStats, Arrays::asList);

        joinedObservables
                .take(10)
                .subscribe(
                        (List<Stats> statslist) -> {
                            System.out.println(statslist);

                            double average = statslist.stream()
                                    .mapToInt(stats -> stats.ongoingRequests)
                                    .average()
                                    .getAsDouble();

                            System.out.println("avg: " + average);
                        },
                        System.err::println,
                        Http::shutdown);

    }
}
0 голосов
/ 25 июня 2019

вы можете сделать это с помощью простой операции zip, например

private fun callRxJava() {

    RetrofitBase.getClient(context).create(Services::class.java).getApiName()
        .subscribeOn(Schedulers.single())
        .observeOn(AndroidSchedulers.mainThread())
    getObservable()
        .flatMap(object : io.reactivex.functions.Function<List<User>, Observable<User>> {
            override fun apply(t: List<User>): Observable<User> {
                return Observable.fromIterable(t); // returning user one by one from usersList.
            } // flatMap - to return users one by one

        })


        .subscribe(object : Observer<User> {
            override fun onSubscribe(d: Disposable) {
                showProgressbar()

            }

            override fun onNext(t: User) {
                userList.add(t)
                hideProgressBar()
            }

            override fun onError(e: Throwable) {
                Log.e("Error---", e.message)
                hideProgressBar()
            }

            override fun onComplete() {
                userAdapter.notifyDataSetChanged()
            }


        })

}

эта функция объединяет ваш ответ на 2 запроса

private fun getObservable(): Observable<List<User>> {
    return Observable.zip(
        getCricketFansObservable(),
        getFootlaballFansObservable(),
        object : BiFunction<List<User>, List<User>, List<User>> {
            override fun apply(t1: List<User>, t2: List<User>): List<User> {
                val userList = ArrayList<User>()
                userList.addAll(t1)
                userList.addAll(t2)
                return userList
            }

        })
}

вот пример первой наблюдаемой

fun getCricketFansObservable(): Observable<List<User>> {
    return RetrofitBase.getClient(context).create(Services::class.java).getCricketers().subscribeOn(Schedulers.io())
}
0 голосов
/ 25 июня 2019

Zip объединяет выбросы нескольких наблюдаемых вместе с помощью указанной функции

Вы можете использовать Zip (rx Java) http://reactivex.io/documentation/operators/zip.html, некоторый код sudoбудет так -

val firstApiObserver = apIs.hitFirstApiFunction(//api parameters)
val secondApiObserver = apIs.hitSecondApiFunction(//api parameters)

val zip: Single<SubscriptionsZipper>//SubscriptionsZipper is the main model which contains first& second api response model ,
zip = Single.zip(firstApiObserver, secondApiObserver, BiFunction { firstApiResponseModel,secondApiResponseModel -> SubscriptionsZipper(firstApiResponseModelObjectInstance, secondApiResponseModelObjectInstance) })

zip.observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(object : SingleObserver<SubscriptionsZipper> {
            override fun onSubscribe(d: Disposable) {
                compositeDisposable.add(d)
            }

            override fun onSuccess(subscriptionsZipper: SubscriptionsZipper) {
                Utils.hideProgressDialog()
               //here you will get both api response together
            }

            override fun onError(e: Throwable) {
                Utils.hideProgressDialog()
            }
        })

Надеюсь, это поможет вам.

...