Здесь есть несколько вещей.Во-первых, parallelStream()
по умолчанию использует общий ForkJoinPool
, что также позволяет вызывающему потоку участвовать.Это означает, что если в вызывающем потоке в настоящий момент выполняется одна из медленных задач, она должна завершиться, прежде чем вызывающая сторона вернет элемент управления.
Это можно увидеть, немного изменив код для регистрации потокаимена и журнал, когда закончите wating:
private static void log(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + " " + msg);
}
public static void main(String[] args) {
Random random = new Random();
List<Integer> nums = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
Optional<Integer> num = nums.parallelStream()
.map(n -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
System.err.println("Interruption error");
}
log("finished waiting");
return n * n;
})
.filter(n -> n < 30)
.peek(n -> log("Found match: " + n))
.findAny();
log("First match: " + num);
}
Пример вывода:
13:56:52.954 [main] Waiting on 9 for 9936 ms
13:56:52.956 [ForkJoinPool.commonPool-worker-1] Waiting on 4 for 7436 ms
13:56:52.970 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 6523 ms
13:56:52.983 [ForkJoinPool.commonPool-worker-3] Waiting on 6 for 7488 ms
13:56:59.494 [ForkJoinPool.commonPool-worker-2] finished waiting
13:56:59.496 [ForkJoinPool.commonPool-worker-2] Found match: 1
13:57:00.392 [ForkJoinPool.commonPool-worker-1] finished waiting
13:57:00.392 [ForkJoinPool.commonPool-worker-1] Found match: 16
13:57:00.471 [ForkJoinPool.commonPool-worker-3] finished waiting
13:57:02.892 [main] finished waiting
13:57:02.894 [main] First match: Optional[1]
Здесь, как вы можете видеть, найдено 2 совпадения, но основной поток все еще занят, так чтоне может вернуть совпадение сейчас.
Это не всегда объясняет все случаи, хотя:
13:58:52.116 [main] Waiting on 9 for 5256 ms
13:58:52.143 [ForkJoinPool.commonPool-worker-1] Waiting on 4 for 4220 ms
13:58:52.148 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 2136 ms
13:58:52.158 [ForkJoinPool.commonPool-worker-3] Waiting on 6 for 7262 ms
13:58:54.294 [ForkJoinPool.commonPool-worker-2] finished waiting
13:58:54.295 [ForkJoinPool.commonPool-worker-2] Found match: 1
13:58:56.364 [ForkJoinPool.commonPool-worker-1] finished waiting
13:58:56.364 [ForkJoinPool.commonPool-worker-1] Found match: 16
13:58:57.399 [main] finished waiting
13:58:59.422 [ForkJoinPool.commonPool-worker-3] finished waiting
13:58:59.424 [main] First match: Optional[1]
Это может быть объяснено тем, как пул объединения вилок объединяет результаты.Кажется, что некоторые улучшения возможны.
В качестве альтернативы, вы действительно могли бы сделать это, используя CompletableFuture
:
// you should probably also pass your own executor to supplyAsync()
List<CompletableFuture<Integer>> futures = nums.stream().map(n -> CompletableFuture.supplyAsync(() -> {
long delay = Math.abs(random.nextLong()) % 10000;
log("Waiting on " + n + " for " + delay + " ms");
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
System.err.println("Interruption error");
}
log("finished waiting");
return n * n;
})).collect(Collectors.toList());
CompletableFuture<Integer> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(unused -> futures.stream().map(CompletableFuture::join).filter(n -> n < 30).findAny().orElse(null));
// shortcircuiting
futures.forEach(f -> f.thenAccept(r -> {
if (r < 30) {
log("Found match: " + r);
result.complete(r);
}
}));
// cancelling remaining tasks
result.whenComplete((r, t) -> futures.forEach(f -> f.cancel(true)));
log("First match: " + result.join());
Выход:
14:57:39.815 [ForkJoinPool.commonPool-worker-1] Waiting on 0 for 7964 ms
14:57:39.815 [ForkJoinPool.commonPool-worker-3] Waiting on 2 for 5743 ms
14:57:39.817 [ForkJoinPool.commonPool-worker-2] Waiting on 1 for 9179 ms
14:57:45.562 [ForkJoinPool.commonPool-worker-3] finished waiting
14:57:45.563 [ForkJoinPool.commonPool-worker-3] Found match: 4
14:57:45.564 [ForkJoinPool.commonPool-worker-3] Waiting on 3 for 7320 ms
14:57:45.566 [main] First match: 4
Обратите внимание, чтоcancel(true)
на самом деле не отменяет текущие задачи (например, прерывание не произойдет), но предотвращает запуск дальнейших задач (вы даже можете видеть, что это может быть не сразу, поскольку работник 3 все еще начал выполнять следующую).
Вы также должны использовать своего собственного исполнителя, с соответствующим размером, зависящим от того, является ли он более интенсивным использованием ЦП или ввода-вывода.Как вы можете видеть, по умолчанию используется общий пул, и поэтому он не использует все ядра.
allOf()
в основном необходим, если совпадение не найдено.Если вы можете гарантировать, что есть хотя бы одно совпадение, вы можете просто использовать `new CompletableFuture () вместо этого.
Наконец, в качестве простого подхода я повторил проверку filter
, но ее легко переместитьЛогика внутри основной логики, вернуть null
или маркер, а затем проверить это в обоих местах.
См. также Как создать будущее, которое будет завершено, когда будет завершено любое из заданных CompletableFuturesс результатом, который соответствует определенному предикату?