У меня есть список Observable, каждый Observable был подписан на планировщик пула, я слил список Observable в один Observable:
Observable.merge (наблюдаемые). Является ли наблюдаемый результат слияния методом doOnNext потокобезопасным? Пример кода ниже.
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("pooled-%s")
.build();
ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
List<Observable<String>> observableList = Stream.of("1", "2", "3", "4")
.map(o -> {
Observable<String> ob = Observable.create(
emitter -> {
Thread.sleep(new Random().nextInt(50));
System.out.println("emitter:" + o + " Thread:" + Thread.currentThread().getName());
emitter.onNext(o);
emitter.onComplete();
});
return ob.subscribeOn(Schedulers.from(executorService));
}
).collect(Collectors.toList());
List<String> result = new ArrayList<>(); //is need thread safe list here ?
Observable.merge(observableList)
.doOnNext(e -> {
Thread.sleep(new Random().nextInt(50));
System.out.println("doOnNext:" + e + " Thread:" + Thread.currentThread().getName());
result.add(e);
})
.blockingLast();
вариант 1:
Когда я запускаю вывод, кажется, что doOnNext вызывается потоком сигналов.
emitter:2 Thread:pooled-1
emitter:3 Thread:pooled-2
emitter:4 Thread:pooled-3
emitter:1 Thread:pooled-0
doOnNext:2 Thread:pooled-1
doOnNext:1 Thread:pooled-1
doOnNext:3 Thread:pooled-1
doOnNext:4 Thread:pooled-1
вариант 2:
Когда я отлаживаю шаг за шагом, выходные данные меняются, и кажется, что doOnNext вызывается многопоточным потоком.
emitter:1 Thread:pooled-0
doOnNext:1 Thread:pooled-0
emitter:4 Thread:pooled-3
emitter:3 Thread:pooled-2
emitter:2 Thread:pooled-1
doOnNext:4 Thread:pooled-3
doOnNext:2 Thread:pooled-3
doOnNext:3 Thread:pooled-3
Я был смущен.
- Я не видел никакого кода для переключения потоков между emitter и doOnNext, так почему «случай 1» возникает, если я пропускаю тот же код.
- Является ли doOnNext небезопасным в этом случае, и список результатов должен использовать поток-безопасный список.
- ArrayList для внутреннего использования метода Observable.toList имеет проблему параллелизма в этом случае.