Несколько параллельных вызовов API REST с использованием RX Observables - PullRequest
0 голосов
/ 17 февраля 2019

Я пытаюсь вызвать REST API параллельно, используя Observables.Я столкнулся с некоторой проблемой по этому вопросу.Я описал проблему здесь.Может ли кто-нибудь помочь мне в этом?

Мой пример использования: у меня есть 5 customerid, мне нужно вызвать другой API REST, передавая этот идентификатор пользователя одновременно, чтобы получить ответ.

, поэтому ярешил использовать Observables здесь для лучшей производительности для одновременного доступа к сервису.

Я попытался с опцией 3 ниже.но я чувствую, что все они обеспечивают одинаковое время отклика.Я не вижу никакой разницы во времени ответа на все эти вызовы

Может ли кто-нибудь найти ошибки в этом коде, если он ошибся.правильно ли я использовал Observables?

1) Observable.from(customerIds).flatMap(customerId -> 
                asyncUserRetrieve(customerId)
                .subscribeOn(Schedulers.io()))
                .subscribe(cust -> {
                        custDetails.add(cust);

                });

2) Observable.from(customerIds).flatMap(customerId -> 
                asyncUserRetrieve(customerId)
                .subscribe(cust -> {

                        custDetails.add(cust);
                });
3) for(String id : customerId) {
    Customer customer = asyncUserRetrieve(id).toBlocking().single();.
    custDetails.add(cust);
}

    @Override
    public Observable<Customer> asyncUserRetrieve(String customerId) {
            final URI uri = getURL(customerId);
            final Response response = httpClient.callForResponse(uri);
            if (response.getHttpStatus().is2xxSuccessful()) {
                Customer customer = getResponse(response, customerId);
                return Observable.just(customer);
            }
            return Observable.just(new Customer().setError(true));

        } 

1 Ответ

0 голосов
/ 19 февраля 2019

Проблема в реализации asyncUserRetrieve.Поэтому при вызове asyncUserRetreive происходит следующее:

  1. Сервер вызовов.
  2. Ожидание ответа сервера.<- <strong>Тема заблокирована
  3. Возвращает наблюдаемое.
  4. Возвращенное наблюдаемое излучает немедленно.


Вместо этого поток должен выглядеть примерно так:

  1. Немедленно возвращает Observable, который отправляет запрос к серверу
  2. Возвращает наблюдаемые выбросы, когда приходит ответ сервера.

Одна из возможных реализаций будет выглядеть следующим образом:

@Override
public Observable<Customer> asyncUserRetrieve(String customerId) {
    return Observable.fromCallable(() -> {
        final URI uri = getURL(customerId);
        final Response response = httpClient.callForResponse(uri);
        if (response.getHttpStatus().is2xxSuccessful()) {
            Customer customer = getResponse(response, customerId);
            return customer;
        }
        return new Customer().setError(true);

    } 
}

Обновление:

Если вы хотите, чтобы текущий поток ожидал, пока не придет весь ответ, попробуйте сделать что-то вроде этого:

RxJava 1

List<Customer> tempCustomers = Observable.from(customerIds)
    .flatMap(customerId -> {
        asyncUserRetrieve(customerId)
            .subscribeOn(Schedulers.io())
    })
    .toList()
    .toBlocking() // Do this only if you want to block the thread.
    .single()

custDetails.addAll(tempCustomers);

RxJava 2

List<Customer> tempCustomers = Observable.fromIterable(customerIds)
    .flatMap(customerId -> {
        asyncUserRetrieve(customerId)
            .subscribeOn(Schedulers.io())
    })
    .toList()
    .blockingGet()

custDetails.addAll(tempCustomers);
...