CompletableFuture - Async.Но не является ли он неблокирующим?
То, что верно в отношении CompletableFuture, - это то, что он действительно асинхронный, он позволяет вам выполнять вашу задачу асинхронно из потока вызывающего, а API, такой как thenXXX
, позволяет вам обрабатыватьрезультат, когда он становится доступным.С другой стороны, CompletableFuture
не всегда неблокирует.Например, при запуске следующего кода он будет выполняться асинхронно по умолчанию ForkJoinPool
:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
});
Очевидно, что Thread
в ForkJoinPool
, который выполняет задачу, будет заблокированв конечном итоге это означает, что мы не можем гарантировать, что вызов будет неблокирующим.
С другой стороны, CompletableFuture
предоставляет API, который позволяет сделать его действительно неблокирующим.
Например, вы всегда можете сделать следующее:
public CompletableFuture myNonBlockingHttpCall(Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
return uncompletedFuture;
}
Как видите, API CompletableFuture
future предоставляет вам методы complete
и completeExceptionally
, которые завершают выполнение всякий раз, когда онтребуется без блокировки какого-либо потока.
Mono vs CompletableFuture
В предыдущем разделе мы получили обзор поведения CF, но в чем заключается центральное различие между CompletableFuture и Mono?
Стоит отметить, что мы можем также блокировать Mono.Никто не мешает нам написать следующее:
Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
})
Конечно, как только мы подпишемся на будущее, поток вызывающих будет заблокирован.Но мы всегда можем обойти это, предоставив дополнительный оператор subscribeOn
.Тем не менее, более широкий API Mono
не является ключевым будущим.
Чтобы понять основное различие между CompletableFuture
и Mono
, давайте вернемся к ранее упомянутой реализации метода myNonBlockingHttpCall
.
public CompletableFuture myUpperLevelBusinessLogic() {
var future = myNonBlockingHttpCall();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
var errorFuture = new CompletableFuture();
errorFuture.completeExceptionally(new RuntimeException());
return errorFuture;
}
return future;
}
В случае CompletableFuture
после вызова метода он с нетерпением выполнит HTTP-вызов другой службы / ресурса.Даже если нам не понадобится результат выполнения после проверки некоторых предварительных / последующих условий, он запускает выполнение, и для этой работы будут выделены дополнительные ресурсы CPU / DB-Connections / What-Ever-Machine-Resources.
Напротив, тип Mono
является ленивым по определению:
public Mono myNonBlockingHttpCallWithMono(Object someData) {
return Mono.create(sink -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
})
});
}
public Mono myUpperLevelBusinessLogic() {
var mono = myNonBlockingHttpCallWithMono();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
return Mono.error(new RuntimeException());
}
return mono;
}
В этом случае ничего не произойдет, пока не будет подписан окончательный mono
.Таким образом, только когда Mono
, возвращенный методом myNonBlockingHttpCallWithMono
, будет подписан, будет выполнена логика, предоставленная Mono.create(Consumer)
.
И мы можем пойти еще дальше.Мы можем сделать наше исполнение намного ленивее.Как вы, возможно, знаете, Mono
расширяет Publisher
из спецификации Reactive Streams.Критическое будущее Reactive Streams - поддержка противодавления.Таким образом, используя API Mono
, мы можем выполнять выполнение только тогда, когда данные действительно нужны, и наш подписчик готов их использовать:
Mono.create(sink -> {
AtomicBoolean once = new AtomicBoolean();
sink.onRequest(__ -> {
if(!once.get() && once.compareAndSet(false, true) {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
});
}
});
});
В этом примере мы выполняем данные только тогда, когда подписчик вызывает Subscription#request
, сделав это, заявив о готовности получить данные.
Сводка
CompletableFuture
асинхронно и может быть неблокирующим CompletableFuture
стремитсяВы не можете отложить казнь.Но вы можете отменить их (что лучше, чем ничего) Mono
является асинхронным / неблокирующим и может легко выполнить любой вызов на другом Thread
, составив основной Mono
с различными операторами. Mono
действительно ленив и позволяет отложить запуск выполнения из-за присутствия подписчика и его готовности использовать данные.