Влияние на производительность параллельных потоков Java - PullRequest
0 голосов
/ 07 февраля 2019

Каковы лучшие практики использования .stream().parallel()?

Например, если у вас есть куча блокирующих вызовов ввода-вывода и вы хотите проверить, если .anyMatch(...), параллельное выполнение этого кода кажется разумным.

Пример кода:

public boolean hasAnyRecentReference(JobId jobid) {
  <...>
  return pendingJobReferences.stream()
     .parallel()
     .anyMatch(pendingRef -> { 
       JobReference readReference = pendingRef.sync();
       Duration referenceAge = timeService.timeSince(readReference.creationTime());
       return referenceAge.lessThan(maxReferenceAge)
     });
}

На первый взгляд это выглядит разумно, потому что мы можем выполнять несколько блокировочных чтений одновременно, поскольку мы заботимся только о том, что совпадает, вместо проверки одно за другим (поэтому, если каждое чтение занимает 50 мс, нам нужно толькоwait ( 50ms * ОжидаемыйNumberOfNonRecentRefs ) / numThreads ).

Может ли введение этого кода в производственную среду иметь непредвиденные последствия для производительности другихчасти кодовой базы?

1 Ответ

0 голосов
/ 07 февраля 2019

РЕДАКТИРОВАТЬ: Как @edharned указывает .parallel() теперь использует CountedCompleter вместо вызова .join(), что имеет свои собственные проблемы, а также объяснил Эд в http://coopsoft.com/ar/Calamity2Article.html в разделе What is currently being done?.

Я полагаю, что приведенная ниже информация по-прежнему полезна для понимания того, почему фреймворк форк-соединения хитрый, а альтернативы, предложенные .parallel() среди выводов, все еще актуальны.


Хотя дух кода верен, реальный код может оказывать влияние на всю систему, которая использует .parallel(), хотя это совсем не очевидно.

Некоторое время назад я нашел статью, которая рекомендовалапротив этого: https://dzone.com/articles/think-twice-using-java-8, но до недавнего времени я не копал глубже.

Вот мои мысли после прочтения:

  1. .parallel() в JavaForkJoinPool.commonPool(), который представляет собой синглтон ForkJoinPool, совместно используемый всеми потоками (ForkJoinPool.commonPool() является общедоступным статическим методом, поэтому теоретически его могут использовать другие библиотеки / части кода)
  2. ForkJoinPool реализует воровство работыи имеет очереди для каждого потока в дополнение к общей очереди

    1. Воровство работы означает, что, когда поток простаивает, он будет искать дополнительную работу для выполнения
    2. Сначала я подумал:это определение, разве пул потоков cached также не выполняет кражу работы (даже если некоторые ссылки называют это совместным использованием рабочих мест для пулов кэшированных потоков)?
    3. Оказывается, существует некоторая неопределенность терминологии, когдаиспользуя слово idle:

      1. В пуле потоков cached поток простаивает только после завершения своей задачи. не становится бездействующим, если он заблокирован в ожидании блокирующего вызова
      2. В пуле потоков forkjoin поток простаивает либо после завершения своей задачи, либо когдавызывает метод .join() (который является специальным блокирующим вызовом) для подзадачи.

        Когда в подзадаче вызывается .join(), поток становится незанятым, ожидая завершения этой подзадачи.Во время простоя он попытается выполнить любую другую доступную задачу, даже если он находится в очереди другого потока (он крадет работу).

        [Это важный бит] Как только он нашел другую задачу для выполнения, он должен завершить ее, прежде чем возобновить свое первоначальное выполнение, даже если подзадача, которую он ожидал, завершается, пока потоквсе еще выполняет украденное задание.

        [Это также важно] Такое поведение при краже работы применимо только к потокам, которые вызывают .join().Если поток заблокирован чем-то другим, например, вводом-выводом, он становится бездействующим (то есть не может похитить работу).

  3. Java-потоки не позволяют вам предоставлять пользовательский ForkJoinPool, но https://github.com/amaembo/streamex делает

Мне потребовалось некоторое время, чтобы понять значение 2.3.2, поэтому я 'Я приведу краткий пример, чтобы проиллюстрировать проблему:

Примечание: это фиктивные примеры, но вы можете попасть в эквивалентные ситуации, не осознавая этого, используя потоки, которые внутренне выполняют работу с fork fork.

Также я буду использовать чрезвычайно упрощенный псевдокод, который служит только для иллюстрации проблемы .parallel (), но не обязательно имеет смысл в противном случае.

Допустим, мы реализуемсортировка слиянием

merge_sort(list):
    left, right = split(list)

    leftTask = mergeSortTask(left).fork()
    rightTask = mergeSortTaks(right).fork()

    return merge(leftTask.join(), rightTask.join())

Допустим, у нас есть еще один фрагмент кода, который выполняет следующее:

dummy_collect_results(queriesIds):
   pending_results = []

   for id in queriesIds: 
     pending_results += longBlockingIOTask(id).fork()

  // do more stuff

Что здесь происходит?

Когда вы пишете код сортировки слиянием, вы думаете, что вызовы сортировки не выполняют никаких операций ввода-вывода, поэтому ихИсполнение должно быть довольно детерминированным, верно?

Верно.То, чего вы не можете ожидать, это то, что, поскольку метод dummy_collect_results создал кучу длительных и блокирующих подзадач, когда потоки, выполняющие задачи слияния, блокируют на .join(), ожидая завершения подзадач, они могут начать выполнять одну из следующих задач:длинные подзадачи блокировки.

Это плохопоскольку, как упоминалось выше, после того, как длинная блокировка (при вводе-выводе, а не вызове .join(), поэтому поток не будет снова простаивать) была украдена, она должна быть завершена независимо от того, была ли подзадачапоток ждал завершения через .join() во время блокировки ввода / вывода.

Это делает выполнение задач сортировки слиянием более не детерминированным, поскольку выполняющие их потоки могут в итоге украсть интенсивные задачи ввода-вывода, сгенерированные кодом, полностью живущим где-то еще.

Этотакже довольно страшно и трудно поймать, потому что вы могли бы использовать .parallel() по всей вашей кодовой базе без каких-либо проблем, и все, что нужно, это один класс, который вводит длительные задачи при использовании .parallel() и внезапно все остальные частиваша кодовая база может получить противоречивую производительность.

Итак, мои выводы:

  1. Теоретически, .parallel() хорошо, если вы можете гарантировать, что все задачи, которыекогда-нибудь будет создано где-нибудь в вашем коде, короткие
  2. .parallel() могут иметь общесистемные последствия для производительности, которые неочевидны, если вы не знаете (например, если позже вы добавите один фрагмент кода, который использует .parallel() и имеет длинные задачи, вы можете повлиять на производительность всего кода, который использует .parallel())
  3. Потому чтоf 2. вам лучше вообще избегать .parallel() и либо использовать ExecutorCompletionService, либо использовать https://github.com/amaembo/streamex, что позволяет вам предоставить свой ForkJoinPool (что позволяет немного увеличить изоляцию).Более того, вы можете использовать https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53,, который дает вам еще более точный контроль над механизмом параллелизма.
...