CompletableFuture: правильный способ запустить список фьючерсов, дождаться результата и обработать исключение - PullRequest
0 голосов
/ 10 ноября 2018

У меня есть устаревший код, который содержит дюжину обращений к базе данных, чтобы заполнить отчет, это занимает заметное количество времени, которое я пытаюсь сократить, используя CompletableFuture.

У меня есть некоторые сомнения, что я все делаю правильно и не злоупотребляю этой технологией.

Мой код теперь выглядит так:

  1. Запустить асинхронное заполнение разделов документа множеством вызовов базы данных внутри каждого метода

    CompletableFuture section1Future = CompletableFuture.supplyAsync(() -> populateSection1(arguments));
    CompletableFuture section2Future = CompletableFuture.supplyAsync(() -> populateSection2(arguments));
        ...
    CompletableFuture section1oFuture = CompletableFuture.supplyAsync(() -> populateSection10(arguments));
    
  2. Затем я размещаю фьючерсы в определенном порядке в arrayList и объединяю их все, чтобы убедиться, что мой код будет работать дальше только после завершения всех фьючерсов.

    List<CompletableFuture> futures = Arrays.asList(
                section1Future,
                section2Future, ...
                section10Future);
    
    List<Object> futureResults = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    
  3. Затем я заполняю сам документ PDF его частями

    Optional.ofNullable((PdfPTable) futureResults.get(0)).ifPresent(el -> populatePdfElement(document, el));
    Optional.ofNullable((PdfPTable) futureResults.get(1)).ifPresent(el -> populatePdfElement(document, el));
        ...
    Optional.ofNullable((PdfPTable) futureResults.get(10)).ifPresent(el -> populatePdfElement(document, el));
    

    возврат документа

Мои опасения:

1) Можно ли таким образом создавать и создавать множество завершаемых фьючерсов? Упорядочить их в необходимой последовательности в arrayList, присоединиться к ним, чтобы убедиться, что все они закончены, а затем получить результат, приведя их к определенному объекту?

2) Можно ли работать без указания службы исполнителя, но полагаться на общий ForkJoinPool? Однако этот код выполняется в веб-контейнере, поэтому, вероятно, для использования JTA мне нужно использовать предоставляемый контейнером пул потоков, выполняемый через JNDI?

3) Если этот код окружен try-catch, я смогу перехватить CompletionException в основном потоке, верно? Или для этого я должен объявить каждую функцию следующим образом:

CompletableFuture.supplyAsync(() -> populateSection1(arguments))
    .exceptionally (ex -> {
                    throw new RuntimeException(ex.getCause());
        });

4) Можно ли чрезмерно использовать CompletableFutures, чтобы они сами стали узким местом производительности? Как много фьючерсов ждет одного исполнителя, чтобы начать работать? Как этого избежать? Использовать контейнер, предоставленный исполнителю сервиса? Если да, может кто-нибудь подсказать мне, как правильно настроить службу executor с учетом процессоров и объема памяти?

5) Влияние памяти. Я прочитал в параллельной ветке, что может быть проблема с OOME, так как создается много объектов и собирается мусор. Есть ли лучший способ расчета правильного объема памяти, необходимого для приложения?

1 Ответ

0 голосов
/ 12 ноября 2018

В целом подход не является неправильным, но есть вещи, которые нужно улучшить.

В частности, вы не должны использовать необработанные типы , например, CompletableFuture.

Когда populateSection… возвращает PdfPTable, вы должны последовательно использовать CompletableFuture<PdfPTable> во всем коде.

* 1013 Т.е. *

CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(()  -> populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(()  -> populateSection2(arguments));
    ...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> populateSection10(arguments));

даже если эти методы не объявляют тип возвращаемого значения, которое, как вы предполагаете, всегда возвращаются во время выполнения, вы должны вставить приведение типа на этом раннем этапе:

CompletableFuture<PdfPTable> section1Future = CompletableFuture.supplyAsync(()  -> (PdfPTable)populateSection1(arguments));
CompletableFuture<PdfPTable> section2Future = CompletableFuture.supplyAsync(()  -> (PdfPTable)populateSection2(arguments));
    ...
CompletableFuture<PdfPTable> section10Future = CompletableFuture.supplyAsync(() -> (PdfPTable)populateSection10(arguments));

Затем вы можете использовать

Stream.of(section1Future, section2Future, ..., section10Future)
    .map(CompletableFuture::join)
    .filter(Objects::nonNull)
    .forEachOrdered(el -> populatePdfElement(document, el));

Не используя необработанные типы, вы уже получаете желаемый тип результата и можете выполнять операции 3-го шага, то есть фильтрацию и выполнение последнего действия, прямо в этой потоковой операции.

Если вам все еще нужен список, вы можете использовать

List<PdfPTable> results = Stream.of(section1Future, section2Future, ..., section10Future)
    .map(CompletableFuture::join)
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

results.forEach(el -> populatePdfElement(document, el));

Тем не менее, параллелизм зависит от пула потоков, используемого для операции (указано для supplyAsync). Когда вы не указываете исполнителя, вы получаете пул Fork / Join по умолчанию, используемый параллельными потоками, поэтому в этом конкретном случае вы получите тот же результат, что и

.
List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
    ()  -> populateSection1(arguments),
    ()  -> populateSection2(arguments));
    ...
    () -> populateSection10(arguments)))
    .parallel()
    .map(Supplier::get)
    .filter(Objects::nonNull)
    .forEachOrdered(el -> populatePdfElement(document, el));

или

List<PdfPTable> results = Stream.<Supplier<PdfPTable>>.of(
    ()  -> populateSection1(arguments),
    ()  -> populateSection2(arguments));
    ...
    () -> populateSection10(arguments)))
    .parallel()
    .map(Supplier::get)
    .filter(Objects::nonNull)
    .collect(Collectors.toList());

results.forEach(el -> populatePdfElement(document, el));

Хотя оба варианта гарантируют, что populatePdfElement будет вызываться в правильном порядке и по одному, только последний будет выполнять все вызовы из инициирующего потока.

Что касается обработки исключений, вы получите любое исключение, выданное поставщиком, заключенное в CompletionException при вызове CompletableFuture::join. Цепочка что-то вроде .exceptionally (ex -> { throw new RuntimeException(ex.getCause()); }); не имеет смысла, новый RuntimeException также будет заключен в CompletionException при вызове CompletableFuture::join.

В варианте Stream вы получите исключение без оболочки. Поскольку Supplier не допускает проверенные исключения, возможны только подтипы RuntimeException или Error.

Другие вопросы слишком широки для вопросов и ответов.

...