Как избежать перегрузки / блокирования / взаимоблокировки службы executor с рекурсивным вызовом - PullRequest
1 голос
/ 12 ноября 2019

Все потоки в ExecutorService заняты задачами, ожидающими задачи, которые застряли в очереди службы executor.

Пример кода:

ExecutorService es=Executors.newFixedThreadPool(8);
        Set<Future<Set<String>>> outerSet=new HashSet<>();
        for(int i=0;i<8;i++){
            outerSet.add(es.submit(new Callable<Set<String>>() {

                @Override
                public Set<String> call() throws Exception {
                    Thread.sleep(10000); //to simulate work
                    Set<Future<String>> innerSet=new HashSet<>();
                    for(int j=0;j<8;j++) {
                        int k=j;
                        innerSet.add(es.submit(new Callable<String>() {
                            @Override
                            public String call() throws Exception {
                                return "number "+k+" in inner loop";
                            }

                        }));
                    }
                    Set<String> out=new HashSet<>();
                    while(!innerSet.isEmpty()) {            //we are stuck at this loop because all the
                        for(Future<String> f:innerSet) {    //callable in innerSet are stuckin the queue
                            if(f.isDone()) {                //of es and can't start since all the threads
                                out.add(f.get());           //in es are busy waiting for them to finish
                            }
                        }
                    }
                    return out;
                }
            }));
        }

Есть ли способчтобы избежать этого, кроме как создания большего количества пулов потоков для каждого слоя или наличия пула потоков, размер которого не фиксирован?

Практическим примером будет, если некоторые вызовы передаются в ForkJoinPool.commonPool (), а затем эти задачииспользуйте объекты, которые также передаются в commonPool одним из их методов.

Ответы [ 3 ]

4 голосов
/ 12 ноября 2019

Вы должны использовать ForkJoinPool. Это было сделано для этой ситуации.

В то время как ваше решение постоянно блокирует поток, пока он ожидает завершения своих подзадач, кража работы ForkJoinPool может выполнять работу, находясь в join(). Это делает его эффективным для подобных ситуаций, когда у вас может быть переменное число небольших (и часто рекурсивных) задач, которые выполняются. С обычным пулом потоков вам нужно было бы увеличить его, чтобы убедиться, что у вас не закончились потоки.

С CompletableFuture вам нужно обрабатывать гораздо больше фактического планирования / планирования самостоятельнои будет сложнее настроить, если вы решите что-то изменить. С FJP единственное, что вам нужно настроить - это количество потоков в пуле, с CF вам нужно подумать о then против thenAsync.

1 голос
/ 12 ноября 2019

Я бы порекомендовал попытаться разложить работу на этапы завершения с помощью CompletableFuture

CompletableFuture.supplyAsync(outerTask) .thenCompose(CompletableFuture.allOf(innerTasks)

Таким образом, ваша внешняя задача не затягивает поток выполнения при обработке внутренних задач, но вы по-прежнему получаете будущее, которое разрешается, когда вся работа завершена. Хотя может быть трудно разделить эти этапы, если они слишком тесно связаны.

0 голосов
/ 12 ноября 2019

Подход, который вы предлагаете, который в основном основан на гипотезе о том, что существует возможное разрешение, если число потоков превышает количество задач, здесь не сработает, если вы уже выделяете пул потоков. Вы можете попробовать это, чтобы увидеть это. Это простой случай тупика, как вы указали в комментариях к вашему коду.

В таком случае используйте два отдельных пула потоков, один для внешнего и другой для внутреннего. И когда задача из внутреннего пула завершится, просто верните значение обратно во внешнее.

Или вы можете просто создать поток на лету, выполнить всю работу в нем, получить результат и вернуть его обратно во внешний интерфейс.

...