Есть ли способ частично контролировать порядок параллельного потока Java? - PullRequest
0 голосов
/ 29 апреля 2019

Я понимаю, что не имеет смысла пытаться заставить параллельный поток выполнять каждый элемент в определенном порядке.Так как он обрабатывает данные параллельно, в порядке упорядочения явно будет недетерминизм.Однако мне было интересно, возможно ли заставить его выполнить «сортировку» по порядку или, по крайней мере, попытаться сохранить порядок, похожий на тот, который был бы, если бы он был последовательным.

Вариант использования

Мне нужно выполнить код для каждой комбинации значений из нескольких массивов.Я создаю поток всех возможных комбинаций индексов следующим образом (имена переменных были запутаны, чтобы не раскрывать частную информацию, я обещаю, что обычно не называю свои переменные arr1, arr2 и т. Д.):

public static void doMyComputation(double[] arr1, double[] arr2, double[] arr3) {
  DoubleStream.of(arr1).mapToObj(Double::valueOf)
    .flatMap(
      i1->DoubleStream.of(arr2).mapToObj(Double::valueOf)
        .flatMap(
          i2->DoubleStream.of(arr3).mapToObj(Double::valueOf)
            .flatMap(
              i3->new Inputs(i1,i2,i3)
             )
        )
    )
    .parallel()
    .forEach(input -> doComputationallyIntensiveThing(input.i1, input.i2, input.i3);

Это прекрасно работает (или, по крайней мере, реальная версия делает, я упростил некоторые вещи для фрагмента кода, который я разместил здесь, так что, возможно, я испортил фрагмент кода).Я ожидаю, что из-за параллелизма я не собираюсь видеть значения в порядке arr1[0], arr2[0], arr3[0], затем arr1[0], arr2[0], arr3[1] и т. Д. Однако я надеялся, что по крайней мере в основном я буду видеть входные данные с первыми несколькими значениями изarr1 сначала, а затем медленно продвигайтесь к концу arr1.Я был удивлен, увидев, что это даже близко не подошло к этому.

Проблема в том, что в этом методе doComputationallyIntensiveThing есть некоторое кэширование, которое ведет себя хорошо, только если мы видим много одинаковых значений изarr1 вместе.Если значения вводятся совершенно случайно, то кэширование приносит больше вреда, чем пользы.

Есть ли способ намека на поток для выполнения входов в порядке, который стремится сгруппировать входы вместе по значениямв arr1?

Если нет, то я могу, вероятно, просто создать новый поток для каждого значения в arr1, и все будет хорошо, но я хотел бы посмотреть, есть ли способсделать все это в одном потоке.

1 Ответ

1 голос
/ 30 апреля 2019

Как правило, вы не должны принимать конкретный порядок обработки для параллельных потоков, но предполагая, что ваш алгоритм корректен, независимо от фактического порядка обработки, вы можете рассуждать о связи между порядком и производительностью.

Реализация Stream уже разработана для того, чтобы использовать преимущества последовательных элементов - для локального процессора. Поэтому, если у вас есть поток из сотен элементов, скажем IntStream.range(0, 100) для упрощения, и обработайте его четырьмя бездействующими ядрами ЦП, реализация разделит его на четыре диапазона: 0-25, 25-50, 50-75 и 75. -100, обрабатывать самостоятельно, в лучшем случае. Таким образом, каждый процессор будет обрабатывать последовательные элементы локально и получать выгоду от низкоуровневых эффектов, например, одновременная загрузка нескольких элементов массива в локальный кеш и т. д.

Итак, проблема с вашим методом doComputationallyIntensiveThing заключается в том, что кеш (и ваш мониторинг) не работают локально. Таким образом, чтобы остаться в приведенном выше примере, операция будет начинаться с параллельного выполнения 0, 25, 50 и 75 одновременно, и если все они завершатся через одинаковое время, за ним последует параллельная оценка 1, 26, 51 и 76. Если какой-либо из четырех элементов первой оценки «выигрывает» и определяет кэшированные данные, он подойдет только для одного из следующих четырех значений. Если сроки потоков изменятся, соотношение станет еще хуже.

Одним из решений было бы изменить doComputationallyIntensiveThing на использование локальных кэшей потоков, чтобы получить выгоду от обработки последовательных элементов в каждом потоке. Тогда способ, которым вы определили операцию Stream, идеально подходит для этой операции, которая выигрывает от многократного просмотра одного и того же элемента arr1. Тем не менее, вы можете упростить код и избавиться от множества накладных расходов:

Arrays.stream(arr1).parallel().forEach(i1 ->
    Arrays.stream(arr2).forEach(i2 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

Однако после этого возникает проблема очистки локальных кеш-потоков, поскольку параллельный поток использует пул потоков вне вашего элемента управления.

Более простое решение для способа, который сегодня работает, заключается в изменении вложенности:

Arrays.stream(arr2).parallel().forEach(i2 ->
    Arrays.stream(arr1).forEach(i1 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

Теперь arr2 разделяется так, как описано выше. Затем каждый рабочий поток будет выполнять ту же итерацию в течение arr1, обрабатывая каждый его элемент столько раз, сколько в arr3 элементов. Это позволяет использовать поведение кэширования между потоками, но существует риск того, что потоки выйдут из синхронизации из-за различий во времени и окажутся в той же ситуации, что и раньше.

Гораздо лучшая альтернатива - перепроектировать doComputationallyIntensiveThing, создав два разных метода: один готовит операцию для определенного элемента arr1, возвращая объект, содержащий кэшированные данные для элемента, а другой - для фактической обработки с использованием кэшированные данные:

Arrays.stream(arr1).parallel()
    .mapToObj(i1 -> prepareOperation(i1))
    .forEach(cached ->
        Arrays.stream(arr2).forEach(i2 ->
            Arrays.stream(arr3).forEach(i3 ->
                doComputationallyIntensiveThing(cached, i2, i3))));

Здесь каждый экземпляр, возвращаемый prepareOperation, связан с конкретным элементом arr1 и действует как локальный кеш для любых данных, связанных с ним, но обычно собирает мусор, когда обработка определенного элемента завершена. Так что никакой очистки не требуется.

В принципе, это также будет работать, если prepareOperation вернет только пустой объект-держатель, который будет заполнен первым вызовом doComputationallyIntensiveThing для конкретного элемента.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...