Наблюдаемый doOnNext поток-safet, когда объединение Наблюдаемых к одному Наблюдаемому - PullRequest
0 голосов
/ 10 января 2019

У меня есть список Observable, каждый Observable был подписан на планировщик пула, я слил список Observable в один Observable: Observable.merge (наблюдаемые). Является ли наблюдаемый результат слияния методом doOnNext потокобезопасным? Пример кода ниже.

    ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setDaemon(true)
            .setNameFormat("pooled-%s")
            .build();
    ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
    List<Observable<String>> observableList = Stream.of("1", "2", "3", "4")
            .map(o -> {
                        Observable<String> ob = Observable.create(
                                emitter -> {
                                    Thread.sleep(new Random().nextInt(50));
                                    System.out.println("emitter:" + o + " Thread:" + Thread.currentThread().getName());
                                    emitter.onNext(o);
                                    emitter.onComplete();
                                });
                        return ob.subscribeOn(Schedulers.from(executorService));
                    }
            ).collect(Collectors.toList());

    List<String> result = new ArrayList<>(); //is need thread safe list here ?

    Observable.merge(observableList)
            .doOnNext(e -> {
                Thread.sleep(new Random().nextInt(50));
                System.out.println("doOnNext:" + e + " Thread:" + Thread.currentThread().getName());
                result.add(e);
            })
            .blockingLast();

вариант 1: Когда я запускаю вывод, кажется, что doOnNext вызывается потоком сигналов.

    emitter:2 Thread:pooled-1
    emitter:3 Thread:pooled-2
    emitter:4 Thread:pooled-3
    emitter:1 Thread:pooled-0
    doOnNext:2 Thread:pooled-1
    doOnNext:1 Thread:pooled-1
    doOnNext:3 Thread:pooled-1
    doOnNext:4 Thread:pooled-1

вариант 2: Когда я отлаживаю шаг за шагом, выходные данные меняются, и кажется, что doOnNext вызывается многопоточным потоком.

    emitter:1 Thread:pooled-0
    doOnNext:1 Thread:pooled-0
    emitter:4 Thread:pooled-3
    emitter:3 Thread:pooled-2
    emitter:2 Thread:pooled-1
    doOnNext:4 Thread:pooled-3
    doOnNext:2 Thread:pooled-3
    doOnNext:3 Thread:pooled-3

Я был смущен.

  1. Я не видел никакого кода для переключения потоков между emitter и doOnNext, так почему «случай 1» возникает, если я пропускаю тот же код.
  2. Является ли doOnNext небезопасным в этом случае, и список результатов должен использовать поток-безопасный список.
  3. ArrayList для внутреннего использования метода Observable.toList имеет проблему параллелизма в этом случае.

1 Ответ

0 голосов
/ 10 января 2019

Потоки являются последовательными и гарантированно не перекрываются в операторах, особенно в объединяющих / координирующих. Тем не менее, это не исключает doOnNext «прыжка» между потоками. Если вы хотите, чтобы doOnNext всегда работал в одном и том же потоке, используйте observeOn перед ним.

Я не видел ни одного кода для переключения потоков между emitter и doOnNext, так почему возникает «случай 1», если я пропускаю один и тот же код.

Потоки могут вытолкнуть merge из любого количества потоков и протолкнуть его.

Является ли doOnNext небезопасным в этом случае, и список результатов должен использовать потокобезопасный список.

doOnNext является потокобезопасным в отношении своего входного параметра и выполнения. Если ваш обратный вызов с состоянием или захватывает глобальное состояние, несколько реализаций потока могут конфликтовать. Они должны быть синхронизированы с помощью внешних средств.

Внутреннее использование метода ArrayList метода Observable.toList в этом случае имеет проблему параллелизма.

toList гарантированно управляется без дублирования, а его внутренний ArrayList для каждого абонента, поэтому здесь нет проблем параллелизма.

...