Попробуйте ждать CompletableFuture - PullRequest
1 голос
/ 16 января 2020

Есть ли способ попытаться подождать CompletableFuture определенное количество времени, прежде чем вернуть другой результат без отмены будущего после истечения времени ожидания?

У меня есть сервис (назовем его expensiveService), который работает, чтобы делать свое дело. Он возвращает результат:

enum Result {
    COMPLETED,
    PROCESSING,
    FAILED
}

Я готов [заблокировать и] ждать его в течение короткого промежутка времени (скажем, 2 с). Если он не заканчивается sh, я хочу вернуть другой результат, но я хочу, чтобы служба продолжала делать свое дело. Тогда клиент должен был бы узнать, закончен ли сервис (например, через веб-сокеты или что-то еще).

Т.е. у нас есть следующие случаи:

  • expensiveService.processAndGet() занимает 1 с и завершает свое будущее. Возвращает COMPLETED.
  • expensiveService.processAndGet() через 1 с. Возвращение FAILED.
  • expensiveService.processAndGet() занимает 5 с и завершает свое будущее. Возвращает PROCESSING. Если мы попросим другую службу о результате, мы получим COMPLETED.
  • expensiveService.processAndGet() через 5 с. Возвращает PROCESSING. Если мы запросим результат у другой службы, мы получим FAILED.

. В этом конкретном случае c нам фактически нужно в любом случае извлечь текущий объект результата по тайм-ауту, что приведет к следующему дополнительный крайний корпус. Это вызывает некоторые проблемы с решениями, предложенными ниже:

  • expensiveService.processAndGet() занимает 2,01 с и завершает свое будущее. Он возвращает либо PROCESSING или COMPLETED.

Я также использую Vavr и открыт для предложений по использованию Vavr Future.

Мы создали три возможных решения, которые у всех есть свои плюсы и минусы:

# 1 Ждите другого будущего

CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            Thread.sleep(2000);
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());

Проблемы

  1. Второй resultService всегда вызывается.
  2. Мы берем весь поток за 2 с.

# 1a Ждем другого будущего, которое проверяет первое будущее

CompletableFuture<Result> f = expensiveService.processAndGet();
return f.applyToEither(Future.of(() -> {
            int attempts = 0;
            int timeout = 20;
            while (!f.isDone() && attempts * timeout < 2000) {
                Thread.sleep(timeout);
                attempts++;
            }
            return null;
        }).map(v -> resultService.get(processId)).toCompletableFuture(),
        Function.identity());

Проблемы

  1. Второй resultService по-прежнему всегда вызывается.
  2. Нам нужно передать первое Будущее второму, которое не так чисто.

# 2 Object.notify

Object monitor = new Object();
CompletableFuture<Upload> process = expensiveService.processAndGet();
synchronized (monitor) {
    process.whenComplete((r, e) -> {
        synchronized (monitor) {
            monitor.notifyAll();
        }
    });
    try {
        int attempts = 0;
        int timeout = 20;
        while (!process.isDone() && attempts * timeout < 2000) {
            monitor.wait(timeout);
            attempts++;
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
if (process.isDone()) {
    return process.toCompletableFuture();
} else {
    return CompletableFuture.completedFuture(resultService.get(processId));
}

Проблемы

  1. Сложный код (возможно наличие ошибок, они не читаются).

# 3 Вавр Future.await

return Future.of(() -> expensiveService.processAndGet()
        .await(2, TimeUnit.SECONDS)
        .recoverWith(e -> {
            if (e instanceof TimeoutException) {
                return Future.successful(resultService.get(processId));
            } else {
                return Future.failed(e);
            }
        })
        .toCompletableFuture();

Проблемы

  1. Нужно будущее в будущем, чтобы избежать await отмены внутреннего будущего.
  2. Перемещение первого будущего во второй нарушает [унаследованный] код T шляпа опирается на ThreadLocal с.
  3. recoverWith и ловить TimeoutException не так уж и элегантно.

# 4 CompletableFuture.orTimeout

return expensiveService.processAndGet()
        .orTimeout(2, TimeUnit.SECONDS)
        .<CompletableFuture<Upload>>handle((u, e) -> {
            if (u != null) {
                return CompletableFuture.completedFuture(u);
            } else if (e instanceof TimeoutException) {
                return CompletableFuture.completedFuture(resultService.get(processId));
            } else {
                return CompletableFuture.failedFuture(e);
            }
        })
        .thenCompose(Function.identity());

Проблемы

  1. Хотя в моем случае будущее processAndGet не отменяется, согласно документации, оно должно быть.
  2. Обработка исключений не очень хорошая.

# 5 CompletableFuture.completeOnTimeout

return expensiveService.processAndGet()
        .completeOnTimeout(null, 2, TimeUnit.SECONDS)
        .thenApply(u -> {
            if (u == null) {
                return resultService.get(processId);
            } else {
                return u;
            }
        });

Проблемы

  1. Хотя в моем случае будущее processAndGet не завершено, согласно документам, так и должно быть.
  2. Что, если processAndGet хотел бы вернуть null как другое состояние?

Все эти решения имеют недостатки и требуют дополнительного кода, но это похоже на то, что должно поддерживаться либо CompletableFuture, либо Future Vavr "из коробки". Есть ли лучший способ сделать это?

1 Ответ

3 голосов
/ 20 января 2020

Стоит сначала указать, как работает CompletableFuture (или почему он назван так, как он есть):

CompletableFuture<?> f = CompletableFuture.supplyAsync(supplier, executionService);

в основном эквивалентно

CompletableFuture<?> f = new CompletableFuture<>();
executionService.execute(() -> {
    if(!f.isDone()) {
        try {
            f.complete(supplier.get());
        }
        catch(Throwable t) {
            f.completeExceptionally(t);
        }
    }
});

Нет подключение от CompletableFuture к коду, выполняемому Executor, фактически, мы можем иметь произвольное количество текущих попыток завершения. Тот факт, что конкретный код предназначен для завершения экземпляра CompletableFuture, становится очевидным только при вызове одного из методов завершения.

Следовательно, CompletableFuture никак не может повлиять на выполняемую операцию, это включает прерывание при отмене или подобное. Как документация CompletableFuture гласит:

Метод Отмена имеет тот же эффект, что и completeExceptionally(new CancellationException())

Так отмена - это просто еще одна попытка завершения, которая выиграет, если она будет первой, но не повлияет на любую другую попытку завершения.

Так что orTimeout(long timeout, TimeUnit unit) не сильно отличается в этом отношении. По истечении времени ожидания он выполнит эквивалент completeExceptionally(new TimeoutException()), который выиграет, если никакая другая попытка завершения не была быстрее, что повлияет на зависимые этапы, но не на другие текущие попытки завершения, например, которые expensiveService.processAndGet() инициировали в вашем случае.

Вы можете выполнить желаемую операцию, например

CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture<Upload> alternative = CompletableFuture.supplyAsync(
    () -> resultService.get(processId), CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS));
return future.applyToEither(alternative, Function.identity())
    .whenComplete((u,t) -> alternative.cancel(false));

. С delayedExecutor мы используем то же средство, что и orTimeout и completeOnTimeout. Он не оценивает указанное Supplier до указанного времени или не выполняет его вообще, если отмена в future.whenComplete выполняется быстрее. applyToEither обеспечит, какой бы результат не был получен быстрее.

Это не завершает future по таймауту, но, как уже было сказано, его завершение не повлияет на исходные вычисления, так что это также будет работать :

CompletableFuture<Upload> future = expensiveService.processAndGet();
CompletableFuture.delayedExecutor(2, TimeUnit.SECONDS)
    .execute(() -> {
        if(!future.isDone()) future.complete(resultService.get(processId));
    });
return future;

это завершает будущее после тайм-аута, как уже говорилось, не затрагивая текущие вычисления, но предоставляя вызывающему альтернативный результат, но не будет распространять исключения, выданные на resultService.get(processId) возвращаемому будущее.

...