Прежде всего, у вас есть , чтобы использовать parallel
, чтобы Fork-Join Pool
был активным. Этот ответ немного объясняет, что такое Spliterator
s, как выполняется разбиение;но в простых словах разбиение выполняется с использованием источника элементов потока, и весь конвейер обрабатывается параллельно.В вашем примере это filter
и map
, как вы выразились (конечно, это включает и операцию terminal
).
Для операций с состоянием - все сложнее.Давайте возьмем distinct
, например, и сначала посмотрим, как он обрабатывает вещи для последовательного случая.
В общем, вы можете подумать, что non-parallel
distinct
можно реализовать с помощью HashSet
- и вы будетеправильный.HashSet
может содержать все значения, которые уже видели , и просто не обрабатывать (отправлять в следующие операции) другие элементы - и теоретически вы будете выполнять непараллельную distinct
операцию.Но что, если известно, что Stream
- SORTED
?Подумайте об этом, это означает, что мы могли бы сохранить один элемент (в отличие от HashSet
, как и раньше), который был бы отмечен как seen
.В основном, если бы вы имели:
1,1,2,2,3
, это означало бы, что ваша операция с состоянием могла бы быть реализована поверх одного элемента, а не HashSet
;код будет выглядеть примерно так:
T seen = null;
....
if(seen == null) || (!currentElement.equals(seen)){
seen = currentElement;
// process seen;
}
Но эта оптимизация возможна только тогда, когда вы знаете, что поток равен SORTED
, поскольку, таким образом, вы знаете, что следующий приходящий элемент либо совпадает сВы уже видели или новый, который вы не могли видеть раньше в какой-либо другой предыдущей операции - это гарантируется операцией сортировки.
А теперь как реализовано parallel distinct
.Вы в основном задаете этот вопрос:
Тогда как будут создаваться пулы потоков
Точно так же, ничего не меняется с точки зрения потока, ForJoinPool
использует то же самоеКоличество потоков - единственное, что меняется, это реализация потока, очевидно.
Проще говоря, если ваш Stream
равен ORDERED
, внутренняя реализация использует LinkedHashSet
(на самом деле это несколько раз, поскольку в таком случае это действительно сокращает), чтобы сохранить ваш заказ ион использует ConcurrentHashMap
, если вы не заботитесь о порядке - то есть, если источник не упорядочен (как Set
), или вы использовали явное имя unordered
.Вы также можете посмотреть реализацию для sorted
, если вы действительно хотите знать, как это делается.
Таким образом, суть в том, что Fork Join Pool
не меняет реализацию на основе потока, он использует ту же модель.С другой стороны, основываясь на ваших операциях, Stream API может использовать некоторые данные с состоянием для промежуточных операций с состоянием, будь то HashSet/ConcurrentHashMap
или отдельный элемент и т. Д.