Завершаемое будущее с тайм-аутом не работает - PullRequest
1 голос
/ 24 апреля 2019

Я новое Завершаемое Будущее. Я пытаюсь вызвать метод параллельно для списка элементов (которые являются аргументами), а затем объединить результаты, чтобы создать окончательный ответ. Я также пытаюсь установить тайм-аут 50 мс, чтобы, если вызов не вернется через 50 мс, я верну значение по умолчанию.

Пока я пробовал это:

    {

     List<ItemGroup> result = Collections.synchronizedList(Lists.newArrayList());

    try {
     List<CompletableFuture> completableFutures = response.getItemGroupList().stream()
     .map(inPutItemGroup -> 
       CompletableFuture.runAsync(() -> {
           final ItemGroup itemGroup = getUpdatedItemGroup(inPutItemGroup);               //call which I am tryin to make parallel

           // this is thread safe
           if (null != itemGroup) {
                  result.add(itemGroup); //output of the call
           }
        }, executorService).acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS),inPutItemGroup))  //this line throws error     
     .collect(Collectors.toList());

// this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()]))
                        .join();
} catch (final Throwable t) {
     final String errorMsg = String.format("Exception occurred while rexecuting parallel call");
                log.error(errorMsg, e);
                result = response.getItemGroupList(); //default value - return the input value if error
    }

    Response finalResponse = Response.builder()
                    .itemGroupList(result)
                    .build();

    }

     private <T> CompletableFuture<T> timeoutAfter(final long timeout, final TimeUnit unit) {
            CompletableFuture<T> result = new CompletableFuture<T>();

            //Threadpool with 1 thread for scheduling a future that completes after a timeout
            ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
            String message = String.format("Process timed out after %s %s", timeout, unit.name().toLowerCase());
            delayer.schedule(() -> result.completeExceptionally(new TimeoutException(message)), timeout, unit);
            return result;
     }

Но я продолжаю получать ошибку, говоря:

 error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
    [javac]                             itemGroup))

incompatible types: inference variable T has incompatible bounds
    [javac]                     .collect(Collectors.toList());
    [javac]                             ^
    [javac]     equality constraints: CompletableFuture
    [javac]     lower bounds: Object
    [javac]   where T is a type-variable:

Может кто-нибудь сказать мне, что я здесь делаю не так? И, пожалуйста, поправьте меня, если я иду в неправильном направлении.

Спасибо.

Ответы [ 2 ]

2 голосов
/ 24 апреля 2019

Вместо

acceptEither(timeoutAfter(50, TimeUnit.MILLISECONDS), inPutItemGroup))

вам понадобится

applyToEither(timeoutAfter(50, TimeUnit.MILLISECONDS), x -> inPutItemGroup)

для компиляции кода.«Accept» - это действие, которое потребляет значение без возврата нового значения, «apply» - это действие, которое создает новое значение.

Однако логическая ошибка все еще существует.Будущее, возвращаемое timeoutAfter, будет завершено исключительно , поэтому зависимые этапы также будут выполняться исключительно, без оценки функций, поэтому этот метод связывания не подходит для замены исключения значением по умолчанию.

Еще хуже, исправление этого приведет к созданию нового будущего, которое будет завершено любым исходным будущим, но это не влияет на действие result.add(itemGroup), выполняемое в одном из исходных фьючерсов.В вашем коде результирующее будущее используется только для ожидания завершения, но не для оценки результата.Поэтому, когда истечет время ожидания, вы перестанете ждать завершения, в то время как все еще могут быть фоновые потоки, изменяющие список.

Правильная логика состоит в том, чтобы отделить этапы извлечения значения, которые могут быть замененызначение по умолчанию по истечении времени ожидания и шаг добавления результата, либо извлеченного значения, либо значения по умолчанию, в список результатов.Затем вы можете дождаться завершения всех add действий.По тайм-ауту могут продолжаться getUpdatedItemGroup оценки (нет способа остановить их выполнение), но их результат будет игнорироваться, поэтому он не влияет на список результатов.

Это также стоитуказывая на то, что создание нового ScheduledExecutorService для каждого элемента списка (который не отключается после использования, что еще хуже) не является правильным подходом.

// result must be effectively final
List<ItemGroup> result = Collections.synchronizedList(new ArrayList<>());
List<ItemGroup> endResult = result;
ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1);
try {
    CompletableFuture<?>[] completableFutures = response.getItemGroupList().stream()
    .map(inPutItemGroup ->
        timeoutAfter(delayer, 50, TimeUnit.MILLISECONDS,
            CompletableFuture.supplyAsync(
                () -> getUpdatedItemGroup(inPutItemGroup), executorService),
            inPutItemGroup)
         .thenAccept(itemGroup -> {
            // this is thread safe, but questionable,
            // e.g. the result list order is not maintained
            if(null != itemGroup) result.add(itemGroup);
         })
    )
    .toArray(CompletableFuture<?>[]::new);

    // this will wait till all threads are completed
    CompletableFuture.allOf(completableFutures).join();
} catch(final Throwable t) {
    String errorMsg = String.format("Exception occurred while executing parallel call");
    log.error(errorMsg, e);
    endResult = response.getItemGroupList();
}
finally {
    delayer.shutdown();
}

Response finalResponse = Response.builder()
    .itemGroupList(endResult)
    .build();
private <T> CompletableFuture<T> timeoutAfter(ScheduledExecutorService es,
    long timeout, TimeUnit unit, CompletableFuture<T> f, T value) {

    es.schedule(() -> f.complete(value), timeout, unit);
    return f;
}

Здесь supplyAsync производит CompletableFuture, который даст результат оценки getUpdatedItemGroup.Вызов timeoutAfter запланирует завершение со значением по умолчанию после тайм-аута, без создания нового будущего, затем зависимое действие, связанное через thenAccept, добавит значение результата в список result.

Обратите внимание, что synchronizedList позволяет добавлять элементы из нескольких потоков, но добавление из нескольких потоков приведет к непредсказуемому порядку, не связанному с порядком списка источников.

0 голосов
/ 24 апреля 2019

Подпись acceptEither выглядит следующим образом:

public CompletableFuture<Void> acceptEither(
    CompletionStage<? extends T> other, 
    Consumer<? super T> action
) {

И строка, которая выдает ошибку, выглядит следующим образом:

.acceptEither(
     timeoutAfter(50, TimeUnit.MILLISECONDS),
     inPutItemGroup
)

Итак, вы видите, что вы пытаетесьпередайте ItemGroup как Consumer<? super T>, где T было выведено как Void, и, следовательно, вы получите ожидаемую ошибку:

error: incompatible types: ItemGroup cannot be converted to Consumer<? super Void>
...