Будет ли параллельный поток работать нормально с отдельной операцией? - PullRequest
0 голосов
/ 06 декабря 2018

Я читал о безгражданстве и сталкивался с этим в doc :

Результаты конвейерного потока могут быть недетерминированными или неверными, если поведенческие параметры для операций потока являются состоящими.Лямбда с состоянием (или другой объект, реализующий соответствующий функциональный интерфейс) - это тот, чей результат зависит от любого состояния, которое может измениться во время выполнения потокового конвейера.

Теперь, если у меня есть список строк(strList скажем), а затем пытается удалить дублирующиеся строки из него, используя параллельные потоки, следующим образом:

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());

или если мы хотим, чтобы регистр не учитывался:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)
                       .distinct().collect(Collectors.toList());

Можету этого кода есть какие-либо проблемы, так как параллельные потоки будут разделять входные данные и различать их в одном фрагменте, не обязательно означает различное во всем входном сигнале?

РЕДАКТИРОВАТЬ (краткий обзор ответов ниже)

The distinct - это операция с состоянием, и в случае промежуточных операций с состоянием параллельные потоки могут потребовать многократных проходов или существенных издержек буферизации.Также distinct может быть реализовано более эффективно, если упорядочение элементов не имеет значения.Также согласно doc :

Для упорядоченных потоков выбор отдельных элементов является стабильным (для дублированных элементов элемент, появляющийся первым в порядке встречи, сохраняется.) Для неупорядоченныхПотоки, гарантии стабильности не предоставляются.

Но в случае, если упорядоченный поток, работающий параллельно, различимый может быть нестабильным - это означает, что он будет содержать произвольный элемент в случае дубликатов и не обязательно первый, как ожидалосьот distinct в противном случае.

Из ссылки :

Внутри операции Different () сохраняется набор, содержащий элементы, которые были видны ранее, но он скрыт внутри операции.и мы не можем получить его из кода приложения.

Так что в случае параллельных потоков он, вероятно, будет потреблять весь поток или может использовать CHM (sth, как ConcurrentHashMap.newKeySet()).А для заказанных, скорее всего, будет использоваться LinkedHashSet или аналогичный.

Ответы [ 4 ]

0 голосов
/ 06 декабря 2018

В документации, которую вы предоставляете, и в фактическом примере вы, похоже, упускаете довольно много вещей.

Результаты конвейерного потока могут быть недетерминированными или неверными, если поведенческие параметры для потоковых операций stateful .

В вашем примере у вас нет операций с состоянием, определенных вами .Состояние в документе означает те, которые вы определяете, а не те, которые реализованы самим jdk - например, distinct в вашем примере.Но в любом случае вы могли бы определить правильную операцию с состоянием, даже Стюарт Маркс, работающий в Oracle / Java, предоставляет такой пример .

Так что вы более чем нормальны в примерахчто вы предоставляете, будь то параллельное или нет.

Дорогая часть distinct (параллельно) проистекает из того факта, что внутри должна существовать поточно-ориентированная структура данных, которая будет содержать различные элементы;в случае JDK это ConcurrentHashMap используется в случае, если заказ не имеет значения, или сокращение с использованием LinkedHashSet, когда заказ имеет значение.

distinct Кстати, это довольно умная реализация, она выглядит, если ваш источник потока уже различен (в таком случае это неоперация), или смотрит, отсортированы ли ваши данные, в которыхв этом случае он будет немного более разумно обходить источник (так как он знает, что если вы видели один элемент, то следующий будет либо тот же, который вы только что видели, либо другой), либо использование ConcurrentHashMap для внутреннего использования и т. д.

0 голосов
/ 06 декабря 2018

Грубо указывает на соответствующие части doc ( Акцент на шахте ):

Промежуточные операции далее делятся на лица без состоянияи операции с состоянием .Операции без сохранения состояния, такие как фильтр и отображение, не сохраняют состояния от ранее увиденного элемента при обработке нового элемента - каждый элемент может обрабатываться независимо от операций над другими элементами. Операции с состоянием, такие как отдельные и отсортированные, могут включать в себя состояние из ранее видимых элементов при обработке новых элементов

Операциям с состоянием может потребоваться обработать весь ввод перед выдачей результата .Например, невозможно произвести какие-либо результаты от сортировки потока до тех пор, пока не будут просмотрены все элементы потока. В результате при параллельных вычислениях для некоторых конвейеров, содержащих промежуточные операции с состоянием, может потребоваться несколько проходов данных или может потребоваться буферизация значимых данных .Конвейеры, содержащие исключительно промежуточные операции без сохранения состояния, могут быть обработаны за один проход, последовательный или параллельный, с минимальной буферизацией данных

Если вы читаете дальше (раздел о порядке):

Потоки могут иметь или не иметь определенный порядок встречи.Наличие у потока порядка встречи зависит от источника и промежуточных операций. Некоторые источники потоков (такие как List или массивы) упорядочены по своей природе, тогда как другие (такие как HashSet) не упорядочены.Некоторые промежуточные операции, такие как sorted (), могут навязывать порядок обращения к неупорядоченному потоку, в противном случае , а другие могут отображать неупорядоченный упорядоченный поток, такой как BaseStream.unordered ().Кроме того, некоторые терминальные операции могут игнорировать порядок встреч, например forEach ().

...

Для параллельных потоков ослабление ограничения порядка может иногда позволить более эффективныйвыполнение. Некоторые агрегированные операции, такие как фильтрация дубликатов (Different ()) или групповые сокращения (Collectors.groupingBy ()), могут быть реализованы более эффективно, если упорядочение элементов не имеет значения .Точно так же операции, которые по своей природе связаны с порядком, например, limit (), могут потребовать буферизации для обеспечения правильного упорядочения, что подрывает преимущества параллелизма. В случаях, когда поток имеет порядок встречи, но пользователь не особенно заботится об этом порядке встречи, явное удаление порядка потока с помощью unordered () может улучшить параллельную производительность для некоторых операций с состоянием или терминальных операций .Однако большинство потоковых конвейеров, таких как приведенный выше пример «суммы весов блоков», по-прежнему эффективно распараллеливаются даже при ограничениях порядка.

В заключение,

  • будет отличатьсяотлично работает с параллельными потоками, но, как вы, возможно, уже знаете, он должен использовать весь поток, прежде чем продолжить, и это может занять много памяти.
  • Если источником элементов является неупорядоченная коллекция (например,hashset) или поток равен unordered(), тогда distinct не беспокоится о порядке вывода и, следовательно, будет эффективен

Решение - добавить .unordered() в конвейер потока, если вы небеспокоюсь о порядке и хотел бы видеть больше производительности.

List<String> result2 = strList.parallelStream()
                              .unordered()
                              .map(String::toLowerCase)
                              .distinct()
                              .collect(Collectors.toList());

Увы, в Java нет (доступного встроенного) одновременного хэш-набора (если они не стали умными с ConcurrentHashMap), так что я могу только оставить вас с прискорбной возможностью того, что отличный способ реализован блокирующим способом с использованием обычного набора Java.В этом случае я не вижу никакой пользы от проведения параллельных различий.


Редактировать: я говорил слишком рано.Там может быть некоторая выгода с использованием параллельных потоков с различными.Похоже, что distinct реализован с большим умом, чем я думал.См. @ Евгения ответ .

0 голосов
/ 06 декабря 2018

Из javadocs, parallelStream ()

Возвращает возможно параллельный поток с этой коллекцией в качестве источника.Для этого метода допустимо возвращать последовательный поток.

Производительность:

  1. Давайте рассмотрим, что у нас есть несколько потоков (к счастью ), который предоставляется различным ядрам процессора.ArrayList<T>, который имеет внутреннее представление данных на основе массива.Или LinkedList<T>, который требует дополнительных вычислений для параллельной обработки разбиения.ArrayList<T> лучше в этом случае!
  2. stream.unordered().parallel().distinct() имеет лучшую производительность, чем stream.parallel().distinct()

Сохранение стабильности для Different () в параллельных конвейерах относительно дорого (требует, чтобы операция действовала как полный барьер, с существенными накладными расходами буферизации).

Таким образом, в вашем случае это не должно быть проблемой (Если ваш List<T> не делаетзабота о порядке).Прочитайте объяснение ниже:

Допустим, у вас есть 4 элемента в ArrayList: {"a", "b", "a", "b"}

Теперь, если вы не't использовать parallelStream() перед вызовом distinct(), сохраняется только строка в позициях 0 и 1. (Сохраняет порядок, последовательный поток)

В остальном (если вы используете parallelStream().distinct())тогда элементы в 1 и 2 могут быть сохранены как отдельные (это нестабильно, но результат тот же {"a," b "} или это может быть даже {" b "," a "}).

Нестабильная отдельная операция случайным образом удалит дубликаты.

Наконец,

при параллельных вычислениях, некоторые конвейеры, содержащие промежуточные операции с состоянием, могут потребовать нескольких проходов наданные или может потребоваться буферизовать важные данные

0 голосов
/ 06 декабря 2018

Не будет проблемы (проблема, как в неправильном результате), но, как сказано в примечании API ,

Сохранение стабильности для Different () в параллельных конвейерах относительнодорогой

Но если производительность вызывает беспокойство и если стабильность не является проблемой (т. е. результат имеет другой порядок элементов в отношении коллекции, которую он обработал), то вы следуетеПримечание API

, устраняющее ограничение упорядочения с помощью BaseStream.unordered (), может привести к значительно более эффективному выполнению Different () в параллельных конвейерах,

Я подумал, почему неттест производительности параллельных и последовательных потоков для distinct

public static void main(String[] args) {
        List<String> strList = Arrays.asList("cat", "nat", "hat", "tat", "heart", "fat", "bat", "lad", "crab", "snob");

        List<String> words = new Vector<>();


        int wordCount = 1_000_000; // no. of words in the list words
        int avgIter = 10; // iterations to run to find average running time

        //populate a list randomly with the strings in `strList`
        for (int i = 0; i < wordCount; i++) 
            words.add(strList.get((int) Math.round(Math.random() * (strList.size() - 1))));





        //find out average running times
        long starttime, pod = 0, pud = 0, sod = 0;
        for (int i = 0; i < avgIter; i++) {
            starttime = System.currentTimeMillis();
            List<String> parallelOrderedDistinct = words.parallelStream().distinct().collect(Collectors.toList());
            pod += System.currentTimeMillis() - starttime;

            starttime = System.currentTimeMillis();
            List<String> parallelUnorderedDistinct =
                    words.parallelStream().unordered().distinct().collect(Collectors.toList());
            pud += System.currentTimeMillis() - starttime;

            starttime = System.currentTimeMillis();
            List<String> sequentialOrderedDistinct = words.stream().distinct().collect(Collectors.toList());
            sod += System.currentTimeMillis() - starttime;
        }

        System.out.println("Parallel ordered time in ms: " + pod / avgIter);
        System.out.println("Parallel unordered time in ms: " + pud / avgIter);
        System.out.println("Sequential implicitly ordered time in ms: " + sod / avgIter);
    }

Выше было скомпилировано open-jdk 8 и выполнено на jre 8 openjdk (без аргументов, специфичных для jvm) на i3 6-го поколения (4 логических ядра)) и я получил эти результаты

Похоже, после определенного нет.из элементов упорядоченная параллель была быстрее, а по иронии судьбы неупорядоченная была самой медленной.Причиной этого (благодаря @Hulk) является то, как он реализован (с использованием HashSet). Так что общее правило будет таким: если вы используете несколько элементов и много дублирующихся на несколько величин больше, вы можете извлечь выгоду из parallel().

1)

Parallel ordered time in ms: 52
Parallel unordered time in ms: 81
Sequential implicitly ordered time in ms: 35

2)

Parallel ordered time in ms: 48
Parallel unordered time in ms: 83
Sequential implicitly ordered time in ms: 34

3)

Parallel ordered time in ms: 36
Parallel unordered time in ms: 70
Sequential implicitly ordered time in ms: 32

Неупорядоченная параллель была в два раза медленнее, чем обе.

Затем я поднял wordCount до 5_000_000, и это были результаты

1)

Parallel ordered time in ms: 93
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 123

2)

Parallel ordered time in ms: 100
Parallel unordered time in ms: 363
Sequential implicitly ordered time in ms: 124

3)

Parallel ordered time in ms: 89
Parallel unordered time in ms: 365
Sequential implicitly ordered time in ms: 118

, а затем 10_000_000

1)

Parallel ordered time in ms: 148
Parallel unordered time in ms: 725
Sequential implicitly ordered time in ms: 218

2)

Parallel ordered time in ms: 150
Parallel unordered time in ms: 749
Sequential implicitly ordered time in ms: 224

3)

Parallel ordered time in ms: 143
Parallel unordered time in ms: 743
Sequential implicitly ordered time in ms: 222
...