Моно против CompletableFuture - PullRequest
       11

Моно против CompletableFuture

0 голосов
/ 25 февраля 2019

CompletableFuture выполняет задачу в отдельном потоке (использует пул потоков) и предоставляет функцию обратного вызова.Допустим, у меня есть вызов API в CompletableFuture.Это блокировка вызова API?Будет ли поток заблокирован, пока он не получит ответ от API?(Я знаю, что основной поток / поток Tomcat будет неблокирующим, но как насчет потока, в котором выполняется задача CompletableFuture?)

Насколько мне известно, Mono полностью неблокирует.

Пожалуйста, пролите немного света на это и поправьте меня, если я ошибаюсь.

1 Ответ

0 голосов
/ 25 февраля 2019

CompletableFuture - Async.Но не является ли он неблокирующим?

То, что верно в отношении CompletableFuture, - это то, что он действительно асинхронный, он позволяет вам выполнять вашу задачу асинхронно из потока вызывающего, а API, такой как thenXXX, позволяет вам обрабатыватьрезультат, когда он становится доступным.С другой стороны, CompletableFuture не всегда неблокирует.Например, при запуске следующего кода он будет выполняться асинхронно по умолчанию ForkJoinPool:

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
});

Очевидно, что Thread в ForkJoinPool, который выполняет задачу, будет заблокированв конечном итоге это означает, что мы не можем гарантировать, что вызов будет неблокирующим.

С другой стороны, CompletableFuture предоставляет API, который позволяет сделать его действительно неблокирующим.

Например, вы всегда можете сделать следующее:

public CompletableFuture myNonBlockingHttpCall(Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> {
        if(exception != null) {
            uncompletedFuture.completeExceptionally(exception);
            return;
        }
        uncompletedFuture.complete(result);
    })

    return uncompletedFuture;
}

Как видите, API CompletableFuture future предоставляет вам методы complete и completeExceptionally, которые завершают выполнение всякий раз, когда онтребуется без блокировки какого-либо потока.

Mono vs CompletableFuture

В предыдущем разделе мы получили обзор поведения CF, но в чем заключается центральное различие между CompletableFuture и Mono?

Стоит отметить, что мы можем также блокировать Mono.Никто не мешает нам написать следующее:

Mono.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
})

Конечно, как только мы подпишемся на будущее, поток вызывающих будет заблокирован.Но мы всегда можем обойти это, предоставив дополнительный оператор subscribeOn.Тем не менее, более широкий API Mono не является ключевым будущим.

Чтобы понять основное различие между CompletableFuture и Mono, давайте вернемся к ранее упомянутой реализации метода myNonBlockingHttpCall.

public CompletableFuture myUpperLevelBusinessLogic() {
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    }

   return future;
}

В случае CompletableFuture после вызова метода он с нетерпением выполнит HTTP-вызов другой службы / ресурса.Даже если нам не понадобится результат выполнения после проверки некоторых предварительных / последующих условий, он запускает выполнение, и для этой работы будут выделены дополнительные ресурсы CPU / DB-Connections / What-Ever-Machine-Resources.

Напротив, тип Mono является ленивым по определению:

public Mono myNonBlockingHttpCallWithMono(Object someData) {
    return Mono.create(sink -> {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            })
    });
} 

public Mono myUpperLevelBusinessLogic() {
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    }

   return mono;
}

В этом случае ничего не произойдет, пока не будет подписан окончательный mono.Таким образом, только когда Mono, возвращенный методом myNonBlockingHttpCallWithMono, будет подписан, будет выполнена логика, предоставленная Mono.create(Consumer).

И мы можем пойти еще дальше.Мы можем сделать наше исполнение намного ленивее.Как вы, возможно, знаете, Mono расширяет Publisher из спецификации Reactive Streams.Критическое будущее Reactive Streams - поддержка противодавления.Таким образом, используя API Mono, мы можем выполнять выполнение только тогда, когда данные действительно нужны, и наш подписчик готов их использовать:

Mono.create(sink -> {
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> {
        if(!once.get() && once.compareAndSet(false, true) {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            });
        }
    });
});

В этом примере мы выполняем данные только тогда, когда подписчик вызывает Subscription#request, сделав это, заявив о готовности получить данные.

Сводка

  • CompletableFuture асинхронно и может быть неблокирующим
  • CompletableFutureстремитсяВы не можете отложить казнь.Но вы можете отменить их (что лучше, чем ничего)
  • Mono является асинхронным / неблокирующим и может легко выполнить любой вызов на другом Thread, составив основной Mono с различными операторами.
  • Mono действительно ленив и позволяет отложить запуск выполнения из-за присутствия подписчика и его готовности использовать данные.
...