Как я могу одновременно выполнять различные операции?
Один поток может выполнять одну задачу за раз. Если вы хотите выполнять несколько операций одновременно, вам нужно переключиться на другие потоки.
Вы можете создать новый поток или использовать ExecutorService для управления пулом потоков, поставить задачу в очередь и выполнить задачу за вас.
Поможет ли изменение stream () наrallelStream ()?
Да, это так. Внутренне parallelStream()
использует ForkJoinPool.commonPool()
для запуска задач для вас. Но имейте в виду, что parallelStream()
не имеет никаких гарантий относительно параллельности возвращаемого потока (но пока текущая реализация возвращает параллельный)
Синхронизируется ли оно при добавлении в uberList?
Вы должны выполнить часть синхронизации в forEach
конвейере. Обычно вы не хотите вызывать collection.add()
внутри forEach
для создания коллекции. Вместо этого вы должны использовать .map().collect(toX())
методы. Это освобождает вас от синхронизирующей части:
- Не требуется знать о вашей локальной переменной (в данном случае
uberlist
. И она не изменит ее при выполнении, поможет уменьшить множество странных ошибок, вызванных параллелизмом
- Вы можете свободно изменять тип коллекции в
.collect()
части. Это дает вам больше контроля над типом результата.
- При использовании с параллельным потоком не требует поточно-ориентированной обработки или синхронизации для заданной коллекции. Потому что «несколько промежуточных результатов могут быть созданы, заполнены и объединены для обеспечения изоляции изменяемых структур данных» ( Подробнее об этом здесь )
Итак, вам нужно выполнить несколько одинаковых вызовов службы одновременно и собрать свой результат в список.
Вы можете сделать это просто параллельным потоком:
uberList = map.entrySet().stream()
.parallel() // Use .stream().parallel() to force parallism. The .parallelStream() does not guarantee that the returned stream is parallel stream
.filter(yourCondition)
.map(e -> yourService.methodCall(e))
.collect(Collectors.toList());
Довольно круто, не правда ли?
Но, как я уже говорил, в параллельном потоке по умолчанию используется ForkJoinPool.commonPool()
для организации очередей и выполнения потоков.
Плохая часть, если ваши yourService.methodCall(e)
выполняют тяжелые операции ввода-вывода (например, HTTP-вызов, даже вызов DB) ... или длительное выполнение задачи , тогда это может исчерпать пул другие входящие задачи будут стоять в очереди навсегда, ожидая выполнения.
Таким образом, обычно все другие задачи зависят от этого общего пула (не только ваш yourService.methodCall(e)
, но и весь другой параллельный поток) будет замедляться из-за времени ожидания.
Чтобы решить эту проблему, вы можете принудительно выполнить параллелизм в своем собственном пуле fork-join:
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // Typically set it to Runtime.availableProcessors()
uberlist = forkJoinPool.submit(() -> {
return map.entrySet().stream()
.parallel() // Use .stream().parallel() to force parallism. The .parallelStream() does not guarantee that the returned stream is parallel stream
.filter(yourCondition)
.map(e -> yourService.methodCall(e))
.collect(Collectors.toList());
}).get();