Сокращение потока API используется на ArrayList не синхронизирован - PullRequest
1 голос
/ 22 апреля 2019

Я использую Stream API Reduce для тестирования списка массивов String.

for (int i = 0; i < 100; i++)
    {
        Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
        Predicate<String> predicate = t -> t.contains("a");

        List<String> strings2 = new ArrayList<>();
        s1.parallel().reduce(new ArrayList<String>(),
                new BiFunction<ArrayList<String>, String, ArrayList<String>>()
                {
                    @Override
                    public ArrayList<String> apply(ArrayList<String> strings, String s)
                    {
                        if (predicate.test(s))
                        {
                            strings.add(s);
                        }

                        return strings;
                    }
                }, new BinaryOperator<ArrayList<String>>()
                {
                    @Override
                    public ArrayList<String> apply(ArrayList<String> strings,
                            ArrayList<String> strings2)
                    {
                        return strings;
                    }
                }).stream().forEach( //
                        e -> {
                            strings2.add(e);
                        });

        if (strings2.contains(null))
        {
            System.out.println(strings2);
        }
    }
}

Я просмотрел несколько блогов, в которых говорится, что в этом случае можно использовать Reduce, и синхронизация может быть гарантирована,но приведенный выше случай выглядит так, как будто это не так.Этот тест является ИСТИННЫМ в парных тестовых прогонах,

strings2.contains(null)

, поэтому мой вопрос таков: неверен ли способ, которым я использую уменьшение, или нужно сделать что-то дополнительное, чтобы убедиться, что sych?

1 Ответ

1 голос
/ 23 апреля 2019

Похоже, filter лучше подходит для решения этой проблемы.Однако, если вы хотите использовать сокращение, и особенно при его параллельном использовании, вы не должны изменять объекты аккумулятора (списки в вашем случае).

Из Oracle учебник по сокращению :

функция аккумулятора также возвращает новое значение каждый раз , она обрабатывает элемент

Когда я запускаю ваш кодЯ получил две распечатки списка, содержащие null, а затем ArrayIndexOutOfBoundsException.Вероятная причина этого заключается в том, что два потока пытались добавить элементы в один и тот же список одновременно.Исключение произошло после того, как список был увеличен, но до добавления элемента, следовательно, слот null (то есть пустой).

ArrayList<String> strings2 = 
    s1.parallel()
      .reduce(new ArrayList<String>(), 
              (list, el) -> {
                if (el.contains("a")) {
                    ArrayList<String> added = new ArrayList<>(list);
                    added.add(el);
                    return added;
                }
                return list;
              }, 
              (list1, list2) -> {
                ArrayList<String> merged = new ArrayList<>(list1);
                merged.addAll(list2);
                    return merged;
              });

Вместо добавления в список, вы должны сделать копиюдобавьте к этой копии и верните копию.Таким образом, каждый поток может работать с разными частями ввода, не мешая другим.

Кроме того, вы не можете просто выбросить часть результата в сумматоре, иначе у вас получатся неполные результаты.Вы должны объединить списки, а не просто вернуть один из них.

...