Делать несколько асинхронных вызовов (запускать и забывать вызовы) одновременно, используя Rx Java Observable - PullRequest
0 голосов
/ 11 октября 2018

У меня есть список нижестоящих вызовов API (около 10), которые мне нужно вызывать одновременно асинхронно.До сих пор я использовал callables, где я использовал

List<RequestContextPreservingCallable <FutureResponse>> callables

. Я бы добавил вызовы API в этот список и отправлял его в конце, используя executeAsyncNoReturnRequestContextPreservingCallables.

Использование Rx Java-наблюдаемых Как мне сделатьсделать это?

List<RequestContextPreservingCallable<FutureResponse>> callables = new 
ArrayList<RequestContextPreservingCallable<FutureResponse>>();

callables.add(apiOneConnector.CallToApiOne(name));
callables.add(apiTwoConnector.CallToApiTWO(sessionId));
....

//execute all the calls
executeAsyncNoReturnRequestContextPreservingCallables(callables);

1 Ответ

0 голосов
/ 11 октября 2018

Вы можете использовать оператор zip.Оператор zip может взять несколько наблюдаемых и выполнить их одновременно, и он будет продолжен после получения всех результатов.

Затем вы можете преобразовать эти результаты в нужную форму и перейти на следующий уровень.

Согласно вашему примеру.Допустим, у вас есть несколько вызовов API для получения имени, сеанса и т. Д., Как показано ниже

Observable.zip(getNameRequest(), getSessionIdRequest(), new BiFunction<String, String, Object>() {
        @Override
        public Object apply(String name, String sessionId) throws Exception {
            // here you will get all the results once everything is completed. you can then take these 
            // results and transform into another object and returnm from here. I decided to transform the results into an Object[]
            // the retuen type of this apply funtion is generic, so you can choose what to return
            return new Object[]{name, sessionId};
        }
    })
    .subscribeOn(Schedulers.io())  // will start this entire chain in an IO thread
    .observeOn(AndroidSchedulers.mainThread()) // observeOn will filp the thread to the given one , so that the downstream will be executed in the specified thread. here I'm switching to main at this point onwards
    .subscribeWith(new DisposableObserver<Object>() {
        @Override
        public void onNext(Object finalResult) {
           // here you will get the final result with all the api results
        }

        @Override
        public void onError(Throwable e) {
            // any error during the entire process will be triggered here
        }

        @Override
        public void onComplete() {
             //will be called once the whole chain is completed and terminated
        }
    });

Вы можете даже передать список наблюдаемых в zip следующим образом

    List<Observable<String>> requests = new ArrayList<>();
    requests.add(getNameRequest());
    requests.add(getSessionIdRequest());

    Observable.zip(requests, new Function<Object[], Object[]>() {
        @Override
        public Object[] apply(Object[] objects) throws Exception {
            return new Object[]{objects[0], objects[1]};
        }
    }).subscribeWith(new DisposableObserver<Object[]>() {
        @Override
        public void onNext(Object[] objects) {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    })          
...