WorkStealingPool и ThreadPoolExecutor дают разные результаты при использовании с CompletableFuture - PullRequest
0 голосов
/ 25 августа 2018

При выполнении следующего кода:

ExecutorService executorService = Executors.newWorkStealingPool(20);

Function<String, CompletableFuture<String>> requestTask =
        url -> CompletableFuture.supplyAsync(() -> {
                    System.out.println("Request " + requestCount++ + " was sent");
                    HttpClient.get(url);
                    return url;
                    }, executorService);

Function<String, String> extractName = s -> s.replaceAll("(https|http|://|\\.com|www\\.|\\.io)", "");

CompletableFuture[] futures = urls.stream() // urls list contains 14 urls
        .map(requestTask)
        .map(future -> future.thenApply(extractName))
        .map(future -> future.thenAccept(System.out::println))
        .toArray(CompletableFuture[]::new);

CompletableFuture.allOf(futures);
executorService.shutdown();

результат будет следующим:

Request 0 was sent
Request 1 was sent
Request 2 was sent
Request 3 was sent
Request 4 was sent
Request 5 was sent
Process finished with exit code 0

Однако, когда Executors.newWorkStealingPool(20) заменяется на Executors.newFixedThreadPool(20), все запросы отправляются,В чем причина такого поведения?

1 Ответ

0 голосов
/ 25 августа 2018

Не все запросы отправляются, потому что ( предупреждение спойлера! ) JVM просто завершается.

Как вы, возможно, знаете, условия для завершения JVM :1007 *

происходит одно из следующих действий:

  • Был вызван метод exit класса Runtime, и диспетчер безопасности разрешил выполнение операции выхода.
  • Все потоки, которые не являются потоками демона, умерли, либо возвращаясь из вызова метода run, либо выбрасывая исключение, которое распространяется за пределы метода run.

Очевидно, что это не первый случай, поэтому он должен быть вторым.

Первое, на что нужно обратить внимание, это то, что вы позволили своему методу main() выйти:

  • вы звоните CompletableFuture.allOf(), но вы ничего не делаете с результатом, поэтому он не блокируется (без join() вызова);
  • вызов ExecutorService.shutDown() только сообщаетвыполнить до завершения, он не ждет его.

Первоначально основной поток является единственным потоком, не являющимся демоном, поэтому этого должно быть достаточно для выхода из JVM.Но в этом и заключается разница между двумя исполнителями:

  • newFixedThreadPool() реализован с помощью ThreadPoolExecutor, который использует Executors.defaultThreadFactory(), который создает потоки, не являющиеся демонами;
  • newWorkStealingPool() реализован с помощью ForkJoinPool, который вызывает setDaemon(true) во всех потоках, которые он создает¹.

К сожалению, это не документировано, но в основном это сводится кна Почему следующее приложение немедленно завершает работу при использовании ForkJoinPool, а не когда я использую ThreadPoolExecutor?

Итак, два возможных решения вашей проблемы:

  • вызов join() после allOf()
  • вызов awaitTermination() на исполнителя после shutdown()

¹ Как отметил teppic в комментариях ,это не было документировано в Java 8, но это теперь с Java 9 .

...