Как я могу немедленно передать задачи из одного ThreadPool в другой? - PullRequest
0 голосов
/ 09 апреля 2019

У меня есть список элементов ввода, которые я хочу поместить в несколько потоков ThreadPools.Допустим, это мой ввод:

final List<Integer> ints = Stream.iterate(1, i -> i + 1).limit(100).collect(Collectors.toList());

Это три функции, которые я хочу, чтобы элементы проходили друг за другом:

final Function<Integer, Integer> step1 =
        value -> { // input from the ints list
            return value * 2;
        };

final Function<Integer, Double> step2 =
        value -> { // input from the previous step1
            return (double) (value * 2); //
        };

final Function<Double, String> step3 =
        value -> { // input from the previous step2
            return "Result: " + value * 2;
        };

И это будут пулы для каждого шага:

final ExecutorService step1Pool = Executors.newFixedThreadPool(4);
final ExecutorService step2Pool = Executors.newFixedThreadPool(3);
final ExecutorService step3Pool = Executors.newFixedThreadPool(1);

Я хочу, чтобы каждый элемент проходил через step1Pool и применял step1.Как только один элемент завершен, его результат должен закончиться в step2pool, так что step2 может быть применено здесь.Как только что-то выполнено в step2Pool, оно должно быть поставлено в очередь в step3Pool и step3.В моей основной ветке я хочу подождать, пока у меня не появятся все результаты из step3.Порядок, в котором обрабатывается каждый элемент, не имеет значения.Только то, что все они проходят через step1 -> step2 -> step3 в правильном пуле потоков.

По сути, я хочу распараллелить Stream.map, сразу переместить каждый результат в следующую очередь иподождите, пока я не получу ints.size() результатов из моего последнего пула потоков назад.

Есть ли простой способ добиться в Java?

Ответы [ 2 ]

4 голосов
/ 09 апреля 2019

Я верю, что CompletableFuture поможет вам здесь!

List<CompletableFuture<String>> futures = ints.stream()
            .map(i -> CompletableFuture.supplyAsync(() -> step1.apply(i), step1Pool)
                    .thenApplyAsync(step2, step2Pool)
                    .thenApplyAsync(step3, step3Pool))
            .collect(Collectors.toList());
List<String> result = futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
0 голосов
/ 09 апреля 2019

Лучше использовать потоки для этого:

List<String> stringList = Stream.iterate(1, i -> i + 1)
                .limit(100)
                .parallel()
                .map(step1)
                .map(step2)
                .map(step3)
                .collect(Collectors.toList());
...