Как полностью распараллелить поток Java8 через буферизацию? - PullRequest
1 голос
/ 09 октября 2019

У меня есть такой код:

Stream<Item> stream = listPaths.parallelStream().flatMap(path -> { ... })

Я также добавил это:

    System.setProperty(
            "java.util.concurrent.ForkJoinPool.common.parallelism",
            String.valueOf(Runtime.getRuntime().availableProcessors() * 4));

Позже я звоню stream.forEach(...)

Однако у меня естьобнаружил, что на машине с 32 ядрами используются только 5-8 ядер.

Я считаю, что происходит то, что код внутри flatMap () и код внутри forEach () страдают задержкой ввода-выводавыдает различные внешние ресурсы и возвращает данные в формате «подходит и запускает» - плохая комбинация с «потянувшей» природой потоков.

Есть ли простые (идиоматические, а не «иди, напишите свои собственные 200 строк»кода "), чтобы обернуть поток в своего рода" буфер потока ", который бы полностью использовал исходный поток (тянет на максимальных потоках) при подаче forEach ()?

1 Ответ

0 голосов
/ 09 октября 2019

На мой взгляд, лучший способ - использовать реактивные потоки. Есть много способов сделать это:

  • Использование <Flowable>
  • Использование RxJava

Или, в-третьих, использование Spring's Reactor framework Они всеесть механизм планирования. Лично я могу использовать один из этих алгоритмов и все равно быть им доволен, если не захочу написать 200 строк кода.

...