Вместо
acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))
вам понадобится
applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)
для компиляции кода.«Accept» - это действие, которое потребляет значение без возврата нового значения, «apply» - это действие, которое создает новое значение.
Однако логическая ошибка все еще существует.Будущее, возвращаемое timeoutAfter
, будет завершено исключительно , поэтому зависимые этапы также будут выполняться исключительно, без оценки функций, поэтому этот метод связывания не подходит для замены исключения значением по умолчанию.
Еще хуже, исправление этого приведет к созданию нового будущего, которое будет завершено любым исходным будущим, но это не влияет на действие result.add(itemGroup)
, выполняемое в одном из исходных фьючерсов.В вашем коде результирующее будущее используется только для ожидания завершения, но не для оценки результата.Поэтому, когда истечет время ожидания, вы перестанете ждать завершения, в то время как все еще могут быть фоновые потоки, изменяющие список.
Правильная логика состоит в том, чтобы отделить этапы извлечения значения, которые могут быть замененызначение по умолчанию по истечении времени ожидания и шаг добавления результата, либо извлеченного значения, либо значения по умолчанию, в список результатов.Затем вы можете дождаться завершения всех add
действий.По тайм-ауту могут продолжаться getUpdatedItemGroup
оценки (нет способа остановить их выполнение), но их результат будет игнорироваться, поэтому он не влияет на список результатов.
Это также стоитуказывая на то, что создание нового ScheduledExecutorService
для каждого элемента списка (который не отключается после использования, что еще хуже) не является правильным подходом.
// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
.map(inPutItemGroup ->
timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
CompletableFuture.supplyAsync(
() -> getUpdatedItemGroup(inPutItemGroup), executorService),
inPutItemGroup)
.thenAccept(itemGroup -> {
// this is thread safe, but questionable,
// e.g. the result list order is not maintained
if(null != itemGroup) result.add(itemGroup);
})
)
.toArray(CompletableFuture<?>[]::new);
// this will wait till all threads are completed
CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
String errorMsg = String.format("Exception occurred while executing parallel call");
log.error(errorMsg, e);
endResult = response.getItemGroupList();
}
finally {
delayer.shutdown();
}
Response finalResponse = Response.builder()
.itemGroupList(endResult)
.build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {
es.schedule(() -> f.complete(value), timeout, unit);
return f;
}
Здесь supplyAsync
производит CompletableFuture
, который даст результат оценки getUpdatedItemGroup
.Вызов timeoutAfter
запланирует завершение со значением по умолчанию после тайм-аута, без создания нового будущего, затем зависимое действие, связанное через thenAccept
, добавит значение результата в список result
.
Обратите внимание, что synchronizedList
позволяет добавлять элементы из нескольких потоков, но добавление из нескольких потоков приведет к непредсказуемому порядку, не связанному с порядком списка источников.