У меня есть список entities
.Я выполняю параллель forEach
в этом списке и выполняю некоторые операции с каждым entity
.Я запускаю параллель forEach
в пределах ForkJoinPool
, чтобы достичь желаемого параллелизма.
Схема моего существующего кода выглядит следующим образом:
ForkJoinPool pool = new ForkJoinPool(4);
Consumer<Entity> consumer = (Entity entity) -> {
try {
doSomething(entity);
} catch(Exception cause) {
}
};
try {
pool.submit(() -> {
entities.stream()
.parallel()
.forEach(consumer);
}).get();
} finally {
pool.shutdown();
}
Как doSomething()
метод может вызвать исключение по какой-либо причине, например, из-за сбоя сетевого подключения;Я хотел бы остановить параллельную обработку, если число последовательных ошибок достигает порога ошибки.Итак, следующий набросок, о котором я подумал:
int errorThreshold = 200;
AtomicInteger errorCount = new AtomicInteger(0);
ForkJoinPool pool = new ForkJoinPool(parallelism);
Consumer<Entity> consumer = (Entity entity) -> {
boolean success = false;
try {
doSomething(entity);
success = true;
} catch(Exception cause) {
}
if (!success) {
if (errorCount.incrementAndGet() == errorThreshold) {
pool.shutdownNow();
}
} else {
errorCount.set(0);
}
};
try {
pool.submit(() -> {
entities.stream()
.parallel()
.forEach(consumer);
}).get();
} finally {
pool.shutdown();
}
Это лучший способ добиться того, чего я хочу?
PS: я использую jdk8.
Обновление
Пример кода:
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Main {
private class Entity {
private int id;
public Entity(int id) {
this.id = id;
}
}
private void execute() throws Exception {
List<Entity> entities = IntStream.range(0, 1000)
.mapToObj(Entity::new)
.collect(Collectors.toList());
int errorThreshold = 5;
AtomicInteger errorCount = new AtomicInteger(0);
ForkJoinPool pool = new ForkJoinPool(4);
Consumer<Entity> consumer = (Entity entity) -> {
boolean success = false;
try {
doSomething(entity);
} catch (Exception cause) {
System.err.println(cause.getMessage());
}
if (!success) {
if (errorCount.incrementAndGet() == errorThreshold) {
pool.shutdownNow();
}
} else {
errorCount.set(0);
}
};
try {
pool.submit(() -> entities
.stream()
.parallel()
.forEach(consumer))
.get();
} catch (Exception cause) {
if (CancellationException.class.isInstance(cause)) {
System.out.println("ForkJoinPool stopped due to consecutive error");
} else {
throw cause;
}
} finally {
if (!pool.isTerminated()) {
pool.shutdown();
}
}
}
private void doSomething(Entity entity) {
if (isPrime(entity.id)) {
throw new RuntimeException("Exception occurred for ID: " + entity.id);
}
System.out.println(entity.id);
}
private boolean isPrime(int n) {
if (n == 2) {
return true;
}
if (n == 0 || n == 1 || n % 2 == 0) {
return false;
}
int limit = (int) Math.ceil(Math.sqrt(n));
for (int i = 3; i <= limit; i += 2) {
if (n % i == 0) {
return false;
}
}
return true;
}
public static void main(String[] args) throws Exception {
new Main().execute();
}
}