Как заставить параллельное выполнение BufferedStream.lines () с flatMap ()? - PullRequest
2 голосов
/ 23 октября 2019

У меня есть некоторый код, который выглядит следующим образом (упрощенный псевдокод):

[...]
// stream constructed of series of web service calls
Stream<InputStream> slowExternalSources = StreamSupport.stream(spliterator, false);
[...]

, затем этот

public Stream<String> getLines(Stream<InputStream> slowExternalSources) {
  return slowExternalSources.flatMap(is -> new BufferedReader(new InputStreamReader(is)).lines())
     .onClose(() -> is.close());
}

, а затем этот

Stream<String> lineStream = getLines();
lineStream.parallel().forEach( ... do some fast CPU-intensive stuff here ... }

Я пытался заставить этот код выполняться с некоторым уровнем распараллеливания.

Проверка в jps / jstack / jmc показывает, что все чтение InputStream происходит в главномпоток, и не распараллеливание вообще.

Возможные следы:

  • BufferedReader.lines() использует Spliterator с parallel=false для создания потока (источник: см. Javaисточники)

  • Мне кажется, я читал некоторые статьи, в которых говорилось, что flatMap плохо взаимодействует с parallel(). Я не могу найти эту статью прямо сейчас.

Как я могу исправить этот код так, чтобы он работал параллельно?

Я хотел бы сохранить потоки Java8если возможно, чтобы избежать перезаписи существующего кода, который ожидает поток.

NOTE Я добавил java.util.concurrent к тегам, потому что я подозреваю, что это может быть частью ответа, даже если этоне часть вопроса.

...