Я получаю InputStream, который содержит несколько элементов, они сканируются, анализируются, итерируются потоком в последовательном порядке (в том же порядке, который они имели в InputStream
), а затем сохраняются в БД. Это работает нормально.
Теперь я пытаюсь выполнить итерацию потока параллельно с Stream<T>.parallel()
, поэтому, пока один поток заблокирован, сохраняется, другие могут сканировать InputStream и сохраняться.
Затем я попытался распараллелить результирующее Stream<MyElement>
с Stream<T>.parallel()
. Чтобы проверить, работает ли распараллеливание, я добавил в поток функцию карты, которая добавляет случайную задержку. Я ожидал, что полученные элементы были напечатаны в случайном порядке.
Но результат не является ожидаемым. Элементы по-прежнему отображаются в порядке файлов.
Есть ли способ правильно итерировать этот поток параллельно?
public class FromInputStreamToParallelStream {
public static Stream<MyElement> getStream(InputStream is) {
try (var scanner = new Scanner(is)) {
return scanner//
.useDelimiter("DELIMITER")
.tokens()
.parallel()
.map(MyElementParser::parse);
}
}
@Test
public void test() throws IOException {
try (InputStream in = Files.newInputStream(Paths.get("my-file.xml"));) {
getStream(in)
.map(FromInputStreamToParallelStream::sleepRandom)
.forEach(System.out::println);
}
}
private static MyElement sleepRandom(MyElement element) {
var randomNumber = new Random().nextInt(10);
System.out.println("wait. " + randomNumber);
try {
TimeUnit.SECONDS.sleep(randomNumber);
} catch (InterruptedException e) {
e.printStackTrace();
}
return element;
}
}
Полагаю, мне нужно реализовать свой собственный Spliterator<T>
.
Заранее спасибо.