У меня есть некоторый код, который выглядит следующим образом (упрощенный псевдокод):
[...]
// stream constructed of series of web service calls
Stream<InputStream> slowExternalSources = StreamSupport.stream(spliterator, false);
[...]
, затем этот
public Stream<String> getLines(Stream<InputStream> slowExternalSources) {
return slowExternalSources.flatMap(is -> new BufferedReader(new InputStreamReader(is)).lines())
.onClose(() -> is.close());
}
, а затем этот
Stream<String> lineStream = getLines();
lineStream.parallel().forEach( ... do some fast CPU-intensive stuff here ... }
Я пытался заставить этот код выполняться с некоторым уровнем распараллеливания.
Проверка в jps
/ jstack
/ jmc
показывает, что все чтение InputStream
происходит в главномпоток, и не распараллеливание вообще.
Возможные следы:
BufferedReader.lines()
использует Spliterator
с parallel=false
для создания потока (источник: см. Javaисточники)
Мне кажется, я читал некоторые статьи, в которых говорилось, что flatMap
плохо взаимодействует с parallel()
. Я не могу найти эту статью прямо сейчас.
Как я могу исправить этот код так, чтобы он работал параллельно?
Я хотел бы сохранить потоки Java8если возможно, чтобы избежать перезаписи существующего кода, который ожидает поток.
NOTE Я добавил java.util.concurrent к тегам, потому что я подозреваю, что это может быть частью ответа, даже если этоне часть вопроса.