подписка (Schedulers.parallel ()) не работает - PullRequest
0 голосов
/ 16 января 2019

Я изучаю активную зону реактора и слежу за этим https://www.baeldung.com/reactor-core

ArrayList<Integer> arrList = new ArrayList<Integer>();
System.out.println("Before: " + arrList);
Flux.just(1, 2, 3, 4)
  .log()
  .map(i -> i * 2)
  .subscribeOn(Schedulers.parallel())
  .subscribe(arrList::add);

System.out.println("After: " + arrList);

когда я выполняю вышеуказанную строку кода, выдает.

 Before: []
 [DEBUG] (main) Using Console logging
 After: []

Приведенные выше строки кода должны начинать выполнение в другом потоке, но он вообще не работает. Может ли кто-нибудь помочь мне в этом?

Ответы [ 2 ]

0 голосов
/ 17 января 2019

Как указано в документации Reactor для различных subscribe методов:

Имейте в виду, что, поскольку последовательность может быть асинхронной, это немедленно вернуть управление вызывающему потоку. Это может дать впечатление, что потребитель не вызывается при выполнении в главном потоке или юнит тест например.

Это означает, что конец основного метода достигнут, и, таким образом, основной поток завершает работу, прежде чем какой-либо поток сможет подписаться на цепочку Reactive, как упомянул Петр.

Что вы хотите сделать, это дождаться завершения всей цепочки, прежде чем печатать содержимое массива.

Наивный способ сделать это:

    ArrayList<Integer> arrList = new ArrayList<>();
    System.out.println("Before: " + arrList);
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(arrList::add)
            .blockLast();

    System.out.println("After: " + arrList);

Здесь вы блокируете выполнение в главном потоке до тех пор, пока не будет обработан последний элемент потока. Таким образом, последний System.out не будет выполняться, пока ваш ArrayList не будет полностью заполнен.

Помните, что способ запуска кода в консольном приложении по сравнению с серверной средой, такой как Netty, немного отличается. Единственный способ заставить консольное приложение ждать всех подписок, это block.

Но блокировка не разрешена в параллельных потоках. Таким образом, этот подход не будет работать, скажем, в среде Netty. Там ваш сервер будет работать до явного выключения, и поэтому subscribe будет в порядке.

Однако в приведенном выше фрагменте кода вы блокируете не только для предотвращения выхода из приложения, но и для ожидания, прежде чем читать заполненные данные.

Улучшение вышеуказанного кода будет следующим:

    ArrayList<Integer> arrList = new ArrayList<>();
    System.out.println("Before: " + arrList);
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .doOnNext(arrList::add)
            .doOnComplete(() -> System.out.println("After: " + arrList))
    .blockLast();

Даже здесь doOnComplete получает доступ к данным вне реактивной цепочки. Чтобы предотвратить это, вы должны собрать элементы Flux в самой цепочке, например:

    System.out.println("Before.");
    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .collectList()
            .doOnSuccess(list -> System.out.println("After: " + list))
    .block();

Опять же, помните, что при работе в Netty (скажем, в приложении Spring Webflux) приведенный выше код будет заканчиваться на subscribe().

Обратите внимание, что переход от Flux к списку (или любой коллекции) означает, что вы переключаетесь с реактивной парадигмы на императивное программирование. Вы должны иметь возможность реализовать любую функциональность в самой парадигме Reactive.

0 голосов
/ 16 января 2019

Я думаю, что есть некоторая путаница. Когда вы звоните subscribeOn(Schedulers.parallel()). Вы указываете, что хотите получать элементы в другой ветке. Кроме того, вы должны замедлить ваш код, чтобы подписчик действительно включился (вот почему я добавил Thread.sleep(100)). Если вы запустите код, который я передал, он работает. Вы видите, что в реакторе нет волшебного механизма синхронизации.

    ArrayList<Integer> arrList = new ArrayList<Integer>();

    Flux.just(1, 2, 3, 4)
            .log()
            .map(i -> i * 2)
            .subscribeOn(Schedulers.parallel())
            .subscribe(
                    t -> {
                        System.out.println(t + " thread id: " + Thread.currentThread().getId());
                        arrList.add(t);
                    }
            );
    System.out.println("size of arrList(before the wait): " + arrList.size());
    System.out.println("Thread id: "+ Thread.currentThread().getId() + ": id of main thread ");
    Thread.sleep(100);
    System.out.println("size of arrList(after the wait): " + arrList.size());

Если вы хотите добавить свои предметы в список в параллельном реакторе, это не лучший выбор. Лучше использовать параллельные потоки в Java 8.

List<Integer> collect = Stream.of(1, 2, 3, 4)
                .parallel()
                .map(i -> i * 2)
                .collect(Collectors.toList());

Тот урок, который вы опубликовали, не очень точен, когда речь идет о параллелизме. К чести автора, он / она говорит, что еще много статей впереди. Но я не думаю, что это должно публиковать этот конкретный пример вообще, поскольку это создает путаницу. Я предлагаю не доверять ресурсам в Интернете так много:)

...