Java Stream уменьшает необъяснимое поведение - PullRequest
4 голосов
/ 19 сентября 2019

Может ли кто-нибудь, пожалуйста, указать мне правильное направление, поскольку я не могу понять проблему.

Я выполняю следующий метод.

private static void reduce_parallelStream() {
    List<String> vals = Arrays.asList("a", "b");

    List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
            (List<String> l, String v) -> {

                l.add(v);

                return l;
            }, (a, b) -> {                   
                a.addAll(b);
                return a;
            }

    );

   System.out.println(join);

}

Он печатает

[ноль, а, ноль, а]

Я не могу понять, почему он ставит два нулевых в результирующемсписок.Я ожидал, что ответ будет

[a, b]

, поскольку это параллельный поток, поэтому первый параметр для уменьшения

new ArrayList ()

, вероятно, будет вызываться дважды для каждого входного значения a и b.

Тогда функция аккумулятора, вероятно, будет вызвана дважды, так как она является параллельным потоком, и передаст каждый вход "a и b" в каждом вызове вместе со списками, представленными с помощью сеяных значений.Таким образом, a добавляется в список 1, а b добавляется в список 2 (или наоборот).После этого комбинатор объединит оба списка, но этого не произойдет.

Интересно, что если я помещу оператор print в свой аккумулятор, чтобы вывести значение ввода, выходной сигнал изменится.Таким образом, следующие

private static void reduce_parallelStream() {
    List<String> vals = Arrays.asList("a", "b");

    List<String> join = vals.parallelStream().reduce(new ArrayList<String>(),
            (List<String> l, String v) -> {
                System.out.printf("l is %s", l);
                l.add(v);
                System.out.printf("l is %s", l);
                return l;
            }, (a, b) -> {
                a.addAll(b);
                return a;
            }

    );

   System.out.println(join);

}

приведут к таким выводам

l - это [] l - это [b] l - это [b, a] l - [b, a] [b, a, b, a]

Может кто-нибудь объяснить, пожалуйста.

Ответы [ 3 ]

5 голосов
/ 19 сентября 2019

Вы должны использовать Collections.synchronizedList() при работе с parallelStream().* * * * * * * * * * * * * * * * * * * * ArrayList * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

private static void reduce_parallelStream() {
    List<String> vals = Arrays.asList("a", "b");

    // Use Synchronized List when with parallelStream()
    List<String> join = vals.parallelStream().reduce(Collections.synchronizedList(new ArrayList<>()),
            (l, v) -> {
                l.add(v);
                return l;
            }, (a, b) -> a // don't use addAll() here to multiplicate the output like [a, b, a, b]
    );
    System.out.println(join);
}
* * * * * * * * * * *1008* * * * * * * * * * *1008*

Вывод:

Иногда вы получите такой вывод:

[a, b]

А иногда этот:

[b, a]

Причинапотому что это parallelStream(), поэтому вы не можете быть уверены в порядке исполнения.

3 голосов
/ 19 сентября 2019

, поскольку это параллельный поток, поэтому первый параметр для уменьшения new ArrayList(), вероятно, будет вызываться дважды для каждого входного значения a и b.

Вот где ты не прав.Первый параметр - это отдельный экземпляр ArrayList, , а не , лямбда-выражение может создавать несколько экземпляров ArrayList.

Следовательно, все редукция работает на одном экземпляре ArrayList.Когда несколько потоков изменяют этот ArrayList параллельно, результаты могут меняться при каждом выполнении.

Ваш combiner фактически добавляет все элементы List к одному List.

Вы можете получить ожидаемый вывод [a,b], если функции accumulator и combiner произведут новый ArrayList вместо изменения их ввода ArrayList:

List<String> join = vals.parallelStream().reduce(
     new ArrayList<String>(),
        (List<String> l, String v) -> {
            List<String> cl = new ArrayList<>(l);
            cl.add(v);
            return cl;
        }, (a, b) -> {
            List<String> ca = new ArrayList<>(a);
            ca.addAll(b);
            return ca;
        }
);

Тем не менее, вы не должны использовать reduce вообще.collect - это правильный способ выполнения изменчивого сокращения:

List<String> join = vals.parallelStream()
                        .collect(ArrayList::new,ArrayList::add,ArrayList::addAll);

Как вы можете видеть, здесь, в отличие от reduce, первый передаваемый вами параметр - Supplier<ArrayList<String>>, который можно использоватьсоздать столько промежуточных ArrayList экземпляров, сколько необходимо.

0 голосов
/ 19 сентября 2019

Это довольно просто, первый аргумент - это тождество , или я бы сказал, ноль для начала.Для parallelStream usage это значение равно повторно .Это означает, что проблемы с параллелизмом (ноль из надстройки) и дубликаты.

могут быть исправлены следующим образом:

    final ArrayList<String> zero = new ArrayList<>();
    List<String> join = vals.parallelStream().reduce(zero,
            (List<String> l, String v) -> {
                if (l == zero) {
                    l = new ArrayList<>();
                }
                l.add(v);
                return l;
            }, (a, b) -> {
                // See comment of Holger:
                if (a == zero) return b;
                if (b == zero) return a;

                a.addAll(b);
                return a;
            }
    );

Safe.

Вы можете спросить, почему reduceне имеет перегрузки для функции обеспечения идентичности.Причина в том, что collect должен был использоваться здесь.

...