Должны ли publisher.observerOn(scheduler).concatMap(mapper, 1)
и publisher.observerOn(scheduler).concatMapEager(mapper, 1, 1)
иметь одинаковое поведение, когда concatMapEager использует параллелизм с 1?
Я тестирую concatMap
и он может использовать другой поток вместо планировщика из observeOn
для вызова маппера.
например
Flowable.just(1, 2, 3, 4, 5)
.observeOn(Schedulers.io())
.concatMapEager(i -> {
System.out.println("1>>>>trigger in thread:" + Thread.currentThread());
return Flowable.create(emitter -> {
executorService.submit(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException ignore) {
}
emitter.onNext("" + i);
emitter.onComplete();
});
}, BackpressureStrategy.DROP);
}, 1, 1)
.map(i -> "ok:" + i)
.blockingSubscribe();
это печатает
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
но если изменить его на concatMap(mapper, 1)
, будет напечатано
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-1,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-2,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-3,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-4,5,main]
Должен ли concatMap
также использовать RxCachedThreadScheduler
? или это намеренно.