RxJava2 concatMap и concatMapEager от Flowable с наблюдать за имеют различное поведение - PullRequest
0 голосов
/ 28 марта 2019

Должны ли publisher.observerOn(scheduler).concatMap(mapper, 1) и publisher.observerOn(scheduler).concatMapEager(mapper, 1, 1) иметь одинаковое поведение, когда concatMapEager использует параллелизм с 1? Я тестирую concatMap и он может использовать другой поток вместо планировщика из observeOn для вызова маппера.

например

Flowable.just(1, 2, 3, 4, 5)
        .observeOn(Schedulers.io())
        .concatMapEager(i -> {
            System.out.println("1>>>>trigger in thread:" + Thread.currentThread());
            return Flowable.create(emitter -> {
                executorService.submit(() -> {
                    try {
                        Thread.sleep(3000L);
                    } catch (InterruptedException ignore) {
                    }
                    emitter.onNext("" + i);
                    emitter.onComplete();
                });
            }, BackpressureStrategy.DROP);
        }, 1, 1)
        .map(i -> "ok:" + i)
        .blockingSubscribe();

это печатает

1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]

но если изменить его на concatMap(mapper, 1), будет напечатано

1>>>>trigger in thread:Thread[RxCachedThreadScheduler-1,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-1,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-2,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-3,5,main]
1>>>>trigger in thread:Thread[pool-1-thread-4,5,main]

Должен ли concatMap также использовать RxCachedThreadScheduler? или это намеренно.

...