потоки ленивы;Вся работа выполняется, когда вы начинаете работу терминала.В вашем случае операция терминала - .collect(Collectors.toList())
, которую вы вызываете в потоке main
по результату get()
.Следовательно, фактическая работа будет выполняться так же, как если бы вы построили весь поток в потоке main
.
Чтобы ваш пул имел эффект, вы должны переместить операцию терминала вотправленное задание:
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
Мы также можем продемонстрировать актуальность операции терминала, построив поток в потоке main
и отправив только операцию терминала в пул:
Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();
Но вы должны иметь в виду, что это недокументированное поведение, которое не гарантируется.Фактический ответ должен состоять в том, что Stream API в его текущей форме, без контроля потока (и без помощи при работе с проверенными исключениями), не подходит для параллельных операций ввода-вывода.