Как убедиться, что вся цепочка CompletableFuture выполняется внутренним пулом потоков при разработке асинхронного Java API? - PullRequest
1 голос
/ 24 сентября 2019

Я пишу библиотеку, которая предлагает несколько асинхронных методов, которые возвращают CompletableFutures.Библиотека имеет внутренний пул потоков для выполнения вычислительной работы асинхронных методов.

Я хочу убедиться, что выполнены следующие два требования:

  1. возвращаемое CompletableFuture НЕ завершеновнутренним потоком, так что цепочки CompletableFuture, внешние по отношению к моей библиотеке, никогда не выполняются внутренним пулом потоков библиотеки
  2. все вычисления моего асинхронного метода выполняются внутренним пулом потоков, а НЕ пользовательским потоком (то есть потоком).вызывающего метода)

Допустим, библиотека имеет следующий метод блокировки

Data request(Address address) {
  Message request = encode(address);
  Message response = sendAndReceive(request);
  Data responseData = decode(response);
  return responseData;
}

и соответствующий асинхронный метод

  CompletableFuture<Data> requestAsync(Address address) {
    return CompletableFuture.supplyAsync(() -> encode(address), internalThreadPool)
        .thenCompose(request -> sendAndReceiveAsync(request))
        .thenApply(response -> decode(response));
  }

Первое требованиевстречается путем добавления цепочки .whenCompleteAsync((v,t) -> {}), как объяснено в этого ответа .

Но что нужно сделать, чтобы выполнить второе требование?

1 Ответ

2 голосов
/ 24 сентября 2019

Решение для второго требования было обсуждено Сергеем Куксенко здесь и реализовано в реализации HttpClient в Java 11.

Требование не выполнено, поскольку не гарантируется, что decode(response) выполнен внутренней резьбой.Если кодирование и sendAndReceiveAsync завершены быстро, decode

может фактически быть выполнено потоком вызывающей стороны.

*1009* Проблема может быть исправлена ​​введением CompletableFuture startCf для запуска цепочки CF.

Таким образом, полное решение может выглядеть следующим образом

CompletableFuture<Data> requestAsyncFixedAll(Address address) {
  CompletableFuture<Void> startCf = new CompletableFuture<>();
  CompletableFuture<Data> dataCf =  startCf.thenApplyAsync(v -> encode(address), internalThreadPool)
    .thenCompose(request -> sendAndReceiveAsync(request))
    .thenApply(response -> decode(response)).whenCompleteAsync((v, t) -> {});
  startCf.complete(null);
  return dataCf;
}
...