Решение CompletableFuture для решения Reactor (или Akka) - PullRequest
0 голосов
/ 30 апреля 2018

У меня есть следующий метод, использующий CompletableFuture, например:

public AClass aMethod() {

    CompletableFuture<SomeClassA> someClassAFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsA());
    CompletableFuture<SomeClassB> someClassBFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsB());
    CompletableFuture<SomeClassC> someClassCFuture =
        CompletableFuture.supplyAsync(() -> someMethodThatReturnsC());

    CompletableFuture.allOf(someClassAFuture, someClassBFuture, someClassCFuture).join();

    return new AClass(someClassAFuture.join(), someClassBFuture.join(), someClassCFuture.join());
}

В этом коде возникает проблема взаимоблокировки, когда потоки T одновременно вводят метод, если в пуле объединения вил меньше потоков, чем T * 3 (поскольку ни один из вызовов allOf не может быть завершен, и они не вернутся к пул текущих занятых тем).

Единственный способ решить эту проблему - ограничить одновременные потоки внутри метода (используя аннотацию Spring @Async с исполнителем потоков) или увеличить потоки в пуле соединений вилки.

Я хотел бы найти лучшее решение, в котором я мог бы полностью забыть о размере пула потоков. Как я могу переписать это с помощью Reactor или Akka?

1 Ответ

0 голосов
/ 30 апреля 2018

Реализация в фьючерсах Akka будет выглядеть примерно так (полностью не проверено):

Future< SomeClassA > f1 = future(() -> someMethodThatReturnsA(), system.dispatcher());
Future< SomeClassB > f2 = future(() -> someMethodThatReturnsB(), system.dispatcher());
Future< SomeClassC > f3 = future(() -> someMethodThatReturnsC(), system.dispatcher());

List<Future<Object>> futures = Arrays.asList(f1, f2, f3);

return sequence(futures).map((results) ->  new AClass(results.get(0),results.get(1),results.get(2)));

Может потребоваться дополнительная работа для анализа результата фьючерса перед созданием AClass. Обратите внимание, что вы возвращаете Future< AClass > сейчас в aMethod

Хотя проблема с вашим кодом в том, что он блокируется. Вы пытались объединить все CompletableFutures, используя thenApply и thenCompose, чтобы вернуть CompletableFuture<AClass>?

...