Понимание алгоритма кражи работы - PullRequest
2 голосов
/ 26 июня 2019

Я много читал об этом алгоритме.При объяснении этого алгоритма слова: алгоритм кражи работы [закрыто]

Эти разветвленные подзадачи могут сами рекурсивно создавать больше подзадач и таким образом заполнять рабочие очереди параллельно работающих потоков,Если один поток завершен и ему больше нечего делать, он может «украсть» работу из очереди другого потока.

Я понимаю, что этот алгоритм новый и не существует в Executors.newCachedThreadPool /Executors.newFixedThreadPool
Я хотел бы видеть, что один поток обрабатывает только работу из своей очереди.Я создал небольшую программу, которая создает потоки рекурсивно.увидеть ниже.Как я могу видеть, что он не использует алгоритм кражи работы?

public static void main(String[] args) throws Exception {
    int[] myArray = IntStream.rangeClosed(1, 10).toArray();

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CustomCallable customCallable = new CustomCallable(myArray, executorService);
        customCallable.call();
        executorService.shutdownNow();
}
public class CustomCallable implements Callable<Integer> {
    private static final int THRESHOLD = 2;
    private int[] array;
    private ExecutorService executorService;

    public CustomCallable(int[] array, ExecutorService executorService) {
        this.array = array;
        this.executorService = executorService;
    }

    @Override
    public Integer call() throws Exception {
        int sum = 0;
        log.debug(" start [{}] ", Arrays.toString(array) );
        if (array.length > THRESHOLD) {
            List<Callable<Integer>> dividedTasks = createSubtasks(array, executorService);
            sum = executorService.invokeAll(dividedTasks).stream()
                    .mapToInt(feature -> {
                        try {
                            return feature.get();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                        return 0;
                    })
                    .sum();
       } else {
            sum = processing(array);
        }
        log.debug(" sum[{}]={} ", Arrays.toString(array) ,sum);
        return sum;
    }

    private List<Callable<Integer>> createSubtasks(int[] array, ExecutorService executorService) {
        int[] arr1 = Arrays.copyOfRange(array, 0, array.length / 2);
        int[] arr2 = Arrays.copyOfRange(array, array.length / 2, array.length);
        List<Callable<Integer>> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomCallable(arr1, executorService));
        dividedTasks.add(new CustomCallable(arr2, executorService));
        return dividedTasks;
    }

    private Integer processing(int[] array) {
        int result = Arrays.stream(array)
                .sum();
        return result;
    }
}

Это вывод:

[main] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]  
[pool-2-thread-1] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5]]  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  start [[1, 2]]**  
[pool-2-thread-2] DEBUG com.example.CustomCallable -  start [[6, 7, 8, 9, 10]]  
[pool-2-thread-4] DEBUG com.example.CustomCallable -  start [[3, 4, 5]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[6, 7]]  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  sum[[1, 2]]=3**  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[6, 7]]=13  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  start [[8, 9, 10]]**  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[3]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[3]]=3  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[4, 5]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[4, 5]]=9  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[8]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[8]]=8  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  start [[9, 10]]  
[pool-2-thread-5] DEBUG com.example.CustomCallable -  sum[[9, 10]]=19  
**[pool-2-thread-3] DEBUG com.example.CustomCallable -  sum[[8, 9, 10]]=27**  
[pool-2-thread-4] DEBUG com.example.CustomCallable -  sum[[3, 4, 5]]=12  
[pool-2-thread-2] DEBUG com.example.CustomCallable -  sum[[6, 7, 8, 9, 10]]=40  
[pool-2-thread-1] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5]]=15  
[main] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]=55 


Как вы можете видеть:
mainсработала нить-2 и нить-1
нить-1 сработала нить-3 (вычисление [1,2]) и нить-4
нить-2 сработала нить-5 и нить-3 (вычисление [1,2] после окончания расчета [8,9,10])
Как я понимаю, нить-3 обрабатывает работу из нити-1 и нити-2.
выглядит как нить-3 кража работы
Если я изменю его на

 ExecutorService executorService = Executors.newWorkStealingPool();

, который поддерживает алгоритм рабочего стола, что я вижу по-другому?

[main] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]  
[ForkJoinPool-1-worker-1] DEBUG com.example.CustomCallable -  start [[1, 2, 3, 4, 5]]  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  start [[1, 2]]  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  sum[[1, 2]]=3  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  start [[3, 4, 5]]  
[ForkJoinPool-1-worker-3] DEBUG com.example.CustomCallable -  start [[6, 7, 8, 9, 10]]  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  start [[6, 7]]  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  sum[[6, 7]]=13  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  start [[8, 9, 10]]  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[3]]**  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[3]]=3**  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[4, 5]]  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[4, 5]]=9  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[8]]**  
**[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[8]]=8**  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  start [[9, 10]]  
[ForkJoinPool-1-worker-4] DEBUG com.example.CustomCallable -  sum[[9, 10]]=19  
[ForkJoinPool-1-worker-2] DEBUG com.example.CustomCallable -  sum[[3, 4, 5]]=12  
[ForkJoinPool-1-worker-0] DEBUG com.example.CustomCallable -  sum[[8, 9, 10]]=27  
[ForkJoinPool-1-worker-1] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5]]=15  
[ForkJoinPool-1-worker-3] DEBUG com.example.CustomCallable -  sum[[6, 7, 8, 9, 10]]=40  
[main] DEBUG com.example.CustomCallable -  sum[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]=55


Как вы можете видеть, у нас есть 5 рабочих и 4 рабочих, которые запускаются работником-3 и работником-1.Чем это отличается от предыдущего выполнения?

Вы можете скачать код с github

...