У меня есть список элементов ввода, которые я хочу поместить в несколько потоков ThreadPools.Допустим, это мой ввод:
final List<Integer> ints = Stream.iterate(1, i -> i + 1).limit(100).collect(Collectors.toList());
Это три функции, которые я хочу, чтобы элементы проходили друг за другом:
final Function<Integer, Integer> step1 =
value -> { // input from the ints list
return value * 2;
};
final Function<Integer, Double> step2 =
value -> { // input from the previous step1
return (double) (value * 2); //
};
final Function<Double, String> step3 =
value -> { // input from the previous step2
return "Result: " + value * 2;
};
И это будут пулы для каждого шага:
final ExecutorService step1Pool = Executors.newFixedThreadPool(4);
final ExecutorService step2Pool = Executors.newFixedThreadPool(3);
final ExecutorService step3Pool = Executors.newFixedThreadPool(1);
Я хочу, чтобы каждый элемент проходил через step1Pool
и применял step1
.Как только один элемент завершен, его результат должен закончиться в step2pool
, так что step2
может быть применено здесь.Как только что-то выполнено в step2Pool
, оно должно быть поставлено в очередь в step3Pool
и step3
.В моей основной ветке я хочу подождать, пока у меня не появятся все результаты из step3
.Порядок, в котором обрабатывается каждый элемент, не имеет значения.Только то, что все они проходят через step1
-> step2
-> step3
в правильном пуле потоков.
По сути, я хочу распараллелить Stream.map
, сразу переместить каждый результат в следующую очередь иподождите, пока я не получу ints.size()
результатов из моего последнего пула потоков назад.
Есть ли простой способ добиться в Java?