API
interface Client {
Observable<RouteDist> routeDist();
}
final class RouteDist {
}
final class ClientImpl implements Client {
@Override
public Observable<RouteDist> routeDist() {
return Observable.fromCallable(() -> {
// with this log, you see, that each subscription to an Observable is executed on the ThreadPool
// Log.e("---------------------", Thread.currentThread().getName());
return new RouteDist();
});
}
}
Применение потоков через подписку
final class ClientProxy implements Client {
private final Client api;
private final Scheduler scheduler;
ClientProxy(Client api, Scheduler scheduler) {
this.api = api;
this.scheduler = scheduler;
}
@Override
public Observable<RouteDist> routeDist() {
// apply #subscribeOn in order to move subscribeAcutal call on given Scheduler
return api.routeDist().subscribeOn(scheduler);
}
}
AndroidTest
@Test
public void name() {
// CachedThreadPool, in order to avoid creating 100-Threads or more. It is always a good idea to use own Schedulers (e.g. Testing)
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(0, 10,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>());
// wrap real client with Proxy, in order to move the subscribeActual call to the ThreadPool
Client client = new ClientProxy(new ClientImpl(), Schedulers.from(threadPool));
List<Observable<RouteDist>> observables = Arrays.asList(client.routeDist(), client.routeDist(), client.routeDist());
TestObserver<List<RouteDist>> test = Observable.zip(observables, objects -> {
return Arrays.stream(objects).map(t -> (RouteDist) t).collect(Collectors.toList());
})
.observeOn(AndroidSchedulers.mainThread())
.test();
test.awaitCount(1);
// verify that onNext in subscribe is called in Android-EventLoop
assertThat(test.lastThread()).isEqualTo(Looper.getMainLooper().getThread());
// verify that 3 calls were made and merged into one List
test.assertValueAt(0, routeDists -> {
assertThat(routeDists).hasSize(3);
return true;
});
}
Дополнительная информация:
http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html
Примечание. Не рекомендуется вызывать API 100 раз одновременно. Более того, когда вы используете Zip, это действительно происходит, когда у вас есть ThreadPool, который достаточно велик. По истечении времени ожидания одного вызова API для этих вызовов API, вероятно, будет выдано onError. Ошибка onError будет распространена далее на подписчика. Вы не получите никакого результата, даже если только при API-вызове произойдет сбой. Рекомендуется иметь некоторый onErrorResumeNext или некоторый другой оператор обработки ошибок, чтобы гарантировать, что один вызов API не отменяет общий результат.