Как правило, вы не должны принимать конкретный порядок обработки для параллельных потоков, но предполагая, что ваш алгоритм корректен, независимо от фактического порядка обработки, вы можете рассуждать о связи между порядком и производительностью.
Реализация Stream уже разработана для того, чтобы использовать преимущества последовательных элементов - для локального процессора. Поэтому, если у вас есть поток из сотен элементов, скажем IntStream.range(0, 100)
для упрощения, и обработайте его четырьмя бездействующими ядрами ЦП, реализация разделит его на четыре диапазона: 0-25, 25-50, 50-75 и 75. -100, обрабатывать самостоятельно, в лучшем случае. Таким образом, каждый процессор будет обрабатывать последовательные элементы локально и получать выгоду от низкоуровневых эффектов, например, одновременная загрузка нескольких элементов массива в локальный кеш и т. д.
Итак, проблема с вашим методом doComputationallyIntensiveThing
заключается в том, что кеш (и ваш мониторинг) не работают локально. Таким образом, чтобы остаться в приведенном выше примере, операция будет начинаться с параллельного выполнения 0
, 25
, 50
и 75
одновременно, и если все они завершатся через одинаковое время, за ним последует параллельная оценка 1
, 26
, 51
и 76
. Если какой-либо из четырех элементов первой оценки «выигрывает» и определяет кэшированные данные, он подойдет только для одного из следующих четырех значений. Если сроки потоков изменятся, соотношение станет еще хуже.
Одним из решений было бы изменить doComputationallyIntensiveThing
на использование локальных кэшей потоков, чтобы получить выгоду от обработки последовательных элементов в каждом потоке. Тогда способ, которым вы определили операцию Stream, идеально подходит для этой операции, которая выигрывает от многократного просмотра одного и того же элемента arr1
. Тем не менее, вы можете упростить код и избавиться от множества накладных расходов:
Arrays.stream(arr1).parallel().forEach(i1 ->
Arrays.stream(arr2).forEach(i2 ->
Arrays.stream(arr3).forEach(i3 ->
doComputationallyIntensiveThing(i1, i2, i3))));
Однако после этого возникает проблема очистки локальных кеш-потоков, поскольку параллельный поток использует пул потоков вне вашего элемента управления.
Более простое решение для способа, который сегодня работает, заключается в изменении вложенности:
Arrays.stream(arr2).parallel().forEach(i2 ->
Arrays.stream(arr1).forEach(i1 ->
Arrays.stream(arr3).forEach(i3 ->
doComputationallyIntensiveThing(i1, i2, i3))));
Теперь arr2
разделяется так, как описано выше. Затем каждый рабочий поток будет выполнять ту же итерацию в течение arr1
, обрабатывая каждый его элемент столько раз, сколько в arr3
элементов. Это позволяет использовать поведение кэширования между потоками, но существует риск того, что потоки выйдут из синхронизации из-за различий во времени и окажутся в той же ситуации, что и раньше.
Гораздо лучшая альтернатива - перепроектировать doComputationallyIntensiveThing
, создав два разных метода: один готовит операцию для определенного элемента arr1
, возвращая объект, содержащий кэшированные данные для элемента, а другой - для фактической обработки с использованием кэшированные данные:
Arrays.stream(arr1).parallel()
.mapToObj(i1 -> prepareOperation(i1))
.forEach(cached ->
Arrays.stream(arr2).forEach(i2 ->
Arrays.stream(arr3).forEach(i3 ->
doComputationallyIntensiveThing(cached, i2, i3))));
Здесь каждый экземпляр, возвращаемый prepareOperation
, связан с конкретным элементом arr1
и действует как локальный кеш для любых данных, связанных с ним, но обычно собирает мусор, когда обработка определенного элемента завершена. Так что никакой очистки не требуется.
В принципе, это также будет работать, если prepareOperation
вернет только пустой объект-держатель, который будет заполнен первым вызовом doComputationallyIntensiveThing
для конкретного элемента.