Как разместить асинхронные вызовы дооснащения с использованием rx java. Я должен сделать более 100 вызовов асинхронно - PullRequest
1 голос
/ 02 мая 2020

Вот пример кода, над которым я работал

элементы содержат 100 элементов, поэтому получение данных с использованием синхронного вызова занимает много времени. Может кто-нибудь предложить способ увеличить скорость этой операции, чтобы она занимала меньше времени. В настоящее время выполнение занимает 15-20 секунд. Я новичок в rx java, поэтому, если возможно, предоставьте подробное решение этой проблемы. dataResponses содержит объекты RouteDistance для каждого из 100 элементов.

for(int i = 0 ; i<items.size();i++){

    Map<String, String> map2 = new HashMap<>();

    map2.put("units", "metric");
    map2.put("origin", currentLocation.getLatitude()+","+currentLocation.getLongitude());
    map2.put("destination", items.get(i).getPosition().get(0)+","+items.get(i).getPosition().get(1));
    map2.put("transportMode", "car");
    requests.add(RetrofitClient4_RouteDist.getClient().getRouteDist(map2));
}

Observable.zip(requests,  new Function<Object[], List<RouteDist>>() {
    @Override
    public List<RouteDist> apply(Object[] objects) throws Exception {
        Log.i("onSubscribe", "apply: " + objects.length);
        List<RouteDist> dataaResponses = new ArrayList<>();
        for (Object o : objects) {
            dataaResponses.add((RouteDist) o);
        }
        return dataaResponses;
    }
})
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(Schedulers.io())
        .subscribe(
                new Consumer<List<RouteDist>>() {
                    @Override
                    public void accept(List<RouteDist> dataaResponses) throws Exception {
                        Log.i("onSubscribe", "YOUR DATA IS HERE: "+dataaResponses.toString());
                        recyclerViewAdapter_profile = new RecyclerViewAdapter_Profile(items,dataaResponses);
                        recyclerView.setAdapter(recyclerViewAdapter_profile);
                    }
                },

                new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable e) throws Exception {
                        Log.e("onSubscribe", "Throwable: " + e);
                    }
                });

1 Ответ

0 голосов
/ 03 мая 2020

API

interface Client {
    Observable<RouteDist> routeDist();
}

final class RouteDist {

}


final class ClientImpl implements Client {
    @Override
    public Observable<RouteDist> routeDist() {
        return Observable.fromCallable(() -> {
            // with this log, you see, that each subscription to an Observable is executed on the ThreadPool
            // Log.e("---------------------", Thread.currentThread().getName());
            return new RouteDist();
        });
    }
}

Применение потоков через подписку

final class ClientProxy implements Client {
    private final Client api;
    private final Scheduler scheduler;

    ClientProxy(Client api, Scheduler scheduler) {
        this.api = api;
        this.scheduler = scheduler;
    }

    @Override
    public Observable<RouteDist> routeDist() {
        // apply #subscribeOn in order to move subscribeAcutal call on given Scheduler
        return api.routeDist().subscribeOn(scheduler);
    }
}

AndroidTest

@Test
public void name() {
    // CachedThreadPool, in order to avoid creating 100-Threads or more. It is always a good idea to use own Schedulers (e.g. Testing)
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(0, 10,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<>());

    // wrap real client with Proxy, in order to move the subscribeActual call to the ThreadPool
    Client client = new ClientProxy(new ClientImpl(), Schedulers.from(threadPool));

    List<Observable<RouteDist>> observables = Arrays.asList(client.routeDist(), client.routeDist(), client.routeDist());

    TestObserver<List<RouteDist>> test = Observable.zip(observables, objects -> {
        return Arrays.stream(objects).map(t -> (RouteDist) t).collect(Collectors.toList());
    })
            .observeOn(AndroidSchedulers.mainThread())
            .test();

    test.awaitCount(1);

    // verify that onNext in subscribe is called in Android-EventLoop
    assertThat(test.lastThread()).isEqualTo(Looper.getMainLooper().getThread());
    // verify that 3 calls were made and merged into one List
    test.assertValueAt(0, routeDists -> {
        assertThat(routeDists).hasSize(3);
        return true;
    });
}

Дополнительная информация:

http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

Примечание. Не рекомендуется вызывать API 100 раз одновременно. Более того, когда вы используете Zip, это действительно происходит, когда у вас есть ThreadPool, который достаточно велик. По истечении времени ожидания одного вызова API для этих вызовов API, вероятно, будет выдано onError. Ошибка onError будет распространена далее на подписчика. Вы не получите никакого результата, даже если только при API-вызове произойдет сбой. Рекомендуется иметь некоторый onErrorResumeNext или некоторый другой оператор обработки ошибок, чтобы гарантировать, что один вызов API не отменяет общий результат.

...