Создать CompletableFuture из блокировки метода вызова - PullRequest
2 голосов
/ 07 июня 2019

Как я могу "преобразовать" вызов метода блокировки в CompletableFuture ?Пример:

T waitForResult() throws InterruptedException {
    obj.await(); // blocking call
    // ...
    return something;
}

Мне нужно превратить это в следующее:

CompletableFuture.of(this::waitForResult); // .of(Callable<T>) doesn't exist

Некоторые моменты, которые следует учитывать:

  1. waitForResult() может вызывать исключения.Они должны обрабатываться правильно, чтобы completableFuture.get() выдавало InterruptedException или ExecutionException.
  2. Не должно быть другого вовлеченного потока (supplyAsync() сделает это).
  3. Это должно быть CompletableFuture (возможно, в оболочке).

Я пробовал это, но это не будет правильно обрабатывать исключения:

CompletableFuture.completedFuture(Void.TYPE).thenApply(v -> {
    try {
        listener.await();
        // ...
        return listener.getResult();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } catch (SnmpException e) {
        throw new RuntimeException(e);
    }
});

Я знаю Создать CompletableFutureиз вызова метода синхронизации , но это мне не помогает:

  • Исходный код в вопросе блокирует основной поток
  • Код в ответах также включает в себятретий поток или неправильно обрабатывает исключения (поправьте меня, если я ошибаюсь)

Ответы [ 3 ]

1 голос
/ 07 июня 2019

Вы можете попробовать это, это большое злоупотребление CompletableFuture, но вы должны решить, приемлемо ли это для вашего случая использования:

private static <T> CompletableFuture<T> supplySynchronously(Callable<T> callable) {
    CompletableFuture<T> f = new CompletableFuture() {

        public T get() throws InterruptedException, ExecutionException {
            synchronized (callable) {
                if (!isDone()) {
                    try {
                        T result = callable.call();
                        complete(result);
                    } catch (Exception e) {
                        completeExceptionally(e);
                    }

                }
            }
            return (T) super.get();
        }
    };
    return f;
}
1 голос
/ 07 июня 2019

Пусть метод Listener.await() вызывает метод CountDownLatch.await():

class Listener {
   CountDownLatch latch = new CountDownLatch(counter);

   void someMethod(){
     latch.countdown();
   }

   public void await() {
      latch.await();
   }
}

, затем вы можете преобразовать его в асинхронный следующим образом:

class Listener {
   AsynCountDownLatch latch = new AsynCountDownLatch(counter);

   void someMethod(){ // here nothing changed
     latch.countdown();
   }

   public CompletableFuture<Void> async() {
      return latch.fin;
   }
}

class AsynCountDownLatch extends AsynCountDownLatch {
   CompletableFuture<Void> fin = new CompletableFuture<>();

   public AsynCountDownLatch(long counter) {
     super(counter);
   }

   public void countdown() {
       super.countdown();
       if (super.getCount()==0L) {
           fin.complete(null);
       }
   }
 }

UPDT: если слушательиспользует другой класс, тогда этот класс также должен быть изменен / расширен / заменен, чтобы преобразовать блокирующие операции в неблокирующие.Универсального способа сделать такое преобразование не существует.

0 голосов
/ 07 июня 2019

Я не уверен, что понимаю ваши требования. Встречается ли это с ними?

private <T> CompletableFuture<T> supplySynchronously(Callable<T> callable) {
    CompletableFuture<T> f = new CompletableFuture<>();
    try {
        f.complete(callable.call());
    } catch (Exception e) {
        f.completeExceptionally(e);
    }
    return f;
}
...