Многопоточная быстрая сортировка или слияние - PullRequest
25 голосов
/ 05 февраля 2010

Как я могу реализовать параллельный алгоритм быстрой сортировки или слияния для Java?

У нас были проблемы с 16- (виртуальным) ядром Mac, на котором работало только одно ядро ​​(!) С использованием алгоритма сортировки Java по умолчанию, и было бы нехорошо видеть, что эта очень тонкая машина полностью не используется , Таким образом, мы написали свою собственную (я написал ее), и мы действительно добились хороших ускорений (я написал многопотоковую быструю сортировку, и из-за ее природы разбиения она очень хорошо распараллеливается, но я мог бы также написать слияние), но моя реализация только масштабируется до 4 потоков, это проприетарный код, и я бы предпочел использовать один из авторитетных источников вместо моего заново изобретенного колеса.

Единственный пример, который я нашел в Интернете, - это пример того, как не для написания многопоточной быстрой сортировки на Java, это циклическое занятие (что действительно ужасно) с использованием:

while (helpRequested) { }

http://broadcast.oreilly.com/2009/06/may-column-multithreaded-algor.html

Таким образом, помимо потери одного потока без какой-либо причины, он обязательно убивает перфекты с помощью занятого цикла в цикле while (что ошеломляет).

Отсюда мой вопрос: знаете ли вы о какой-либо правильно многопоточной реализации быстрой сортировки или быстрой сортировки в Java, которая поступила бы из надежного источника?

Я подчеркиваю тот факт, что я знаю, что сложность остается O (n log n), но мне все равно очень приятно было бы увидеть, как все эти ядра начинают работать вместо холостого хода. Обратите внимание, что для других задач на тех же 16 виртуальных ядрах Mac я увидел ускорение до x7 за счет распараллеливания кода (и я ни в коем случае не эксперт по параллелизму).

Так что, даже несмотря на то, что сложность остается O (n log n), я бы действительно оценил ускорение x7, x8 или даже x16.

Ответы [ 8 ]

20 голосов
/ 05 февраля 2010

попробуйте fork / join framework от Doug Lea :

public class MergeSort extends RecursiveAction {
    final int[] numbers;
    final int startPos, endPos;
    final int[] result;

    private void merge(MergeSort left, MergeSort right) {
        int i=0, leftPos=0, rightPos=0, leftSize = left.size(), rightSize = right.size();
        while (leftPos < leftSize && rightPos < rightSize)
            result[i++] = (left.result[leftPos] <= right.result[rightPos])
                ? left.result[leftPos++]
                : right.result[rightPos++];
        while (leftPos < leftSize)
            result[i++] = left.result[leftPos++];
        while (rightPos < rightSize)
        result[i++] = right.result[rightPos++];
    }

    public int size() {
        return endPos-startPos;
    }

    protected void compute() {
        if (size() < SEQUENTIAL_THRESHOLD) {
            System.arraycopy(numbers, startPos, result, 0, size());
            Arrays.sort(result, 0, size());
        } else {
            int midpoint = size() / 2;
            MergeSort left = new MergeSort(numbers, startPos, startPos+midpoint);
            MergeSort right = new MergeSort(numbers, startPos+midpoint, endPos);
            coInvoke(left, right);
            merge(left, right);
        }
    }
}

(источник: http://www.ibm.com/developerworks/java/library/j-jtp03048.html?S_TACT=105AGX01&S_CMP=LP)

9 голосов
/ 21 сентября 2014

Java 8 предоставляет java.util.Arrays.parallelSort, который сортирует массивы параллельно, используя инфраструктуру fork-join. Документация содержит некоторые подробности о текущей реализации (но это ненормативные примечания):

Алгоритм сортировки - это параллельная сортировка-слияние, которая разбивает массив на подмассивы, которые сами сортируются и затем объединяются. Когда длина вложенного массива достигает минимальной степени детализации, вложенный массив сортируется с использованием соответствующего метода Arrays.sort. Если длина указанного массива меньше минимальной гранулярности, он сортируется с использованием соответствующего метода Arrays.sort. Алгоритм требует рабочее пространство не больше, чем размер исходного массива. Общий пул ForkJoin используется для выполнения любых параллельных задач.

Кажется, что нет соответствующего метода параллельной сортировки для списков (хотя RandomAccess списки должны хорошо работать с сортировкой), поэтому вам нужно будет использовать toArray, сортировать этот массив сохранить результат обратно в список. (Я задал вопрос об этом здесь .)

7 голосов
/ 05 января 2011

Извините, но то, что вы просите, невозможно. Я полагаю, что кто-то еще упомянул, что сортировка связана с IO, и они, скорее всего, правильные. Код от Doug Lea от IBM - хорошая работа, но я считаю, что он предназначен в основном как пример того, как писать код. Если вы заметили в своей статье, он никогда не публиковал тесты для него и вместо этого публиковал тесты для другого рабочего кода, такого как вычисление средних значений и параллельное нахождение минимального максимума. Вот каковы результаты тестов, если вы используете общую сортировку слиянием, быструю сортировку, сортировку по методу Дугса с использованием пула объединяющих форков и ту, которую я написал с помощью пула быстрого объединения с объединением. Вы увидите, что сортировка слиянием лучше всего подходит для N 100 или меньше. Быстрая сортировка от 1000 до 10000, а быстрая сортировка с использованием пула объединяющих вил превосходит все остальные, если у вас 100000 и выше. Эти тесты состояли из массивов случайных чисел, которые выполнялись 30 раз для создания среднего значения для каждой точки данных и выполнялись на четырехъядерном ядре с примерно 2 гигабайтами оперативной памяти. И ниже у меня есть код для быстрой сортировки. Это в основном показывает, что, если вы не пытаетесь отсортировать очень большой массив, вам следует отказаться от попыток улучшить алгоритм сортировки кодов, поскольку параллельные работают очень медленно на маленьких N.

Merge Sort
10  7.51E-06
100 1.34E-04
1000    0.003286269
10000   0.023988694
100000  0.022994328
1000000 0.329776132


Quick Sort
5.13E-05
1.60E-04
7.20E-04
9.61E-04
0.01949271
0.32528383


Merge TP
1.87E-04
6.41E-04
0.003704411
0.014830678
0.019474009
0.19581768

Quick TP
2.28E-04
4.40E-04
0.002716065
0.003115251
0.014046681
0.157845389

import jsr166y.ForkJoinPool;
import jsr166y.RecursiveAction;

//  derived from
//  http://www.cs.princeton.edu/introcs/42sort/QuickSort.java.html
//  Copyright © 2007, Robert Sedgewick and Kevin Wayne.
//  Modified for Join Fork by me hastily. 
public class QuickSort {

    Comparable array[];
    static int limiter = 10000;

    public QuickSort(Comparable array[]) {
        this.array = array;
    }

    public void sort(ForkJoinPool pool) {
        RecursiveAction start = new Partition(0, array.length - 1);        
        pool.invoke(start);
    }

    class Partition extends RecursiveAction {

        int left;
        int right;

        Partition(int left, int right) {
            this.left = left;
            this.right = right;
        }

        public int size() {
            return right - left;
        }

        @SuppressWarnings("empty-statement")
        //void partitionTask(int left, int right) {
        protected void compute() {
            int i = left, j = right;
            Comparable tmp;
            Comparable pivot = array[(left + right) / 2];

            while (i <= j) {
                while (array[i].compareTo(pivot) < 0) {
                    i++;
                }
                while (array[j].compareTo(pivot) > 0) {
                    j--;
                }

                if (i <= j) {
                    tmp = array[i];
                    array[i] = array[j];
                    array[j] = tmp;
                    i++;
                    j--;
                }
            }


            Partition leftTask = null;
            Partition rightTask = null;

            if (left < i - 1) {
                leftTask = new Partition(left, i - 1);
            }
            if (i < right) {
                rightTask = new Partition(i, right);
            }

            if (size() > limiter) {
                if (leftTask != null && rightTask != null) {
                    invokeAll(leftTask, rightTask);
                } else if (leftTask != null) {
                    invokeAll(leftTask);
                } else if (rightTask != null) {
                    invokeAll(rightTask);
                }
            }else{
                if (leftTask != null) {
                    leftTask.compute();
                }
                if (rightTask != null) {
                    rightTask.compute();
                }
            }
        }
    }
}
1 голос
/ 18 августа 2011

Только что закодировал вышеупомянутую MergeSort и производительность была очень плохой.

Кодовый блок ссылается на «coInvoke (слева, справа);» но не было никакой ссылки на это и заменил его invokeAll (слева, справа);

Тестовый код:

MergeSort mysort = new MyMergeSort(array,0,array.length);
ForkJoinPool threadPool = new ForkJoinPool();
threadPool.invoke(mysort);

но пришлось остановить его из-за плохой работы.

Я вижу, что вышеприведенной статье почти год, и, возможно, сейчас все изменилось.

Я нашел код в альтернативной статье для работы: http://blog.quibb.org/2010/03/jsr-166-the-java-forkjoin-framework/

0 голосов
/ 21 августа 2018
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class IQ1 {
    public static void main(String[] args) {
        // Get number of available processors
        int numberOfProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("Number of processors : " + numberOfProcessors);
        // Input data, it can be anything e.g. log records, file records etc
        long[][] input = new long[][]{
              { 5, 8, 9, 14, 20 },
              { 17, 56, 59, 80, 102 },
              { 2, 4, 7, 11, 15 },
              { 34, 37, 39, 45, 50 }
            };

        /* A special thread pool designed to work with fork-and-join task splitting
         * The pool size is going to be based on number of cores available 
         */
        ForkJoinPool pool = new ForkJoinPool(numberOfProcessors);
        long[] result = pool.invoke(new Merger(input,  0, input.length));

        System.out.println(Arrays.toString(result));
    }
    /* Recursive task which returns the result
     * An instance of this will be used by the ForkJoinPool to start working on the problem
     * Each thread from the pool will call the compute and the problem size will reduce in each call
     */
    static class Merger extends RecursiveTask<long[]>{
        long[][] input;
        int low;
        int high;

        Merger(long[][] input, int low, int high){
            this.input = input;
            this.low = low;
            this.high = high;
        }

        @Override
        protected long[] compute() {            
            long[] result = merge();
            return result;
        }

        // Merge
        private long[] merge(){
            long[] result = new long[input.length * input[0].length];
            int i=0;
            int j=0;
            int k=0;
            if(high - low < 2){
                return input[0];
            }
            // base case
            if(high - low == 2){
                long[] a = input[low];
                long[] b = input[high-1];
                result = mergeTwoSortedArrays(a, b);
            }
            else{
                // divide the problem into smaller problems
                int mid = low + (high - low) / 2;
                Merger first = new Merger(input, low, mid);
                Merger second = new Merger(input, mid, high);
                first.fork();
                long[] secondResult = second.compute();
                long[] firstResult = first.join();

                result = mergeTwoSortedArrays(firstResult, secondResult);
            }

            return result;
        }

        // method to merge two sorted arrays
        private long[] mergeTwoSortedArrays(long[] a, long[] b){
            long[] result = new long[a.length + b.length];
            int i=0;
            int j=0;
            int k=0;
                while(i<a.length && j<b.length){
                    if(a[i] < b[j]){
                        result[k] = a[i];
                        i++;
                    } else{
                        result[k] = b[j];
                        j++;
                    }
                    k++;
                }

                while(i<a.length){
                    result[k] = a[i];
                    i++;
                    k++;
                }

                while(j<b.length){
                    result[k] = b[j];
                    j++;
                    k++;
                }

        return result;
    }
    }
}
0 голосов
/ 07 июля 2011

Последние пару дней я сам сталкивался с проблемой многопоточной сортировки. Как объяснено на этом слайде Caltech лучшее, что вы можете сделать, просто многопоточность каждого шага подходов разделяй и властвуй по очевидному количеству потоков (количество делений) ограничено. Я полагаю, это потому, что хотя вы можете запустить 64 подразделения в 64 потоках, используя все 64 ядра вашей машины, 4 подразделения можно запустить только в 4 потоках, 2 на 2, 1 на 1 и т. Д. Так что для многих уровней рекурсии ваша машина используется недостаточно.

Прошлой ночью мне пришло решение, которое могло бы пригодиться в моей собственной работе, поэтому я опубликую его здесь.

Если первый критерий вашей функции сортировки основан на целом числе максимального размера s, будь то действительное целое число или символ в строке, так что это целое число или символ полностью определяет самый высокий уровень вашего вида, затем Я думаю, что есть очень быстрое (и простое) решение. Просто используйте это начальное целое число, чтобы разделить вашу проблему сортировки на более мелкие проблемы сортировки, и сортируйте те, используя стандартный однопоточный алгоритм сортировки по вашему выбору. Я думаю, что деление на классы можно сделать за один проход. После выполнения независимых сортировок проблем слияния не возникает, поскольку вы уже знаете, что все в классе 1 сортируется до класса 2 и т. Д.

Пример: если вы хотите выполнить сортировку на основе strcmp (), затем используйте первый символ в вашей строке, чтобы разбить ваши данные на 256 классов, затем сортируйте каждый класс в следующем доступном потоке, пока все они не будут завершены.

Этот метод полностью использует все доступные ядра, пока проблема не будет решена, и я думаю, что это легко реализовать. Я еще не реализовал его, поэтому могут возникнуть проблемы, которые мне еще предстоит найти. Он явно не может работать для сортировок с плавающей запятой и был бы неэффективным для больших s. Его производительность также будет сильно зависеть от энтропии целого числа / символа, используемого для определения классов.

Возможно, это то, что Фабиан Стиг предложил в нескольких словах, но я прямо заявляю, что в некоторых обстоятельствах вы можете создать несколько более мелких сортов из более крупного.

0 голосов
/ 20 июня 2010

Как вы думаете, почему параллельная сортировка поможет? Я думаю, что большая часть сортировки связана с вводом / выводом, а не с обработкой. Если ваше сравнение не выполнит много вычислений, ускорение маловероятно.

0 голосов
/ 05 февраля 2010

Вы, вероятно, учитывали это, но это может помочь взглянуть на конкретную проблему с более высокого уровня, например, если вы не сортируете только один массив или список, может быть намного проще сортировать отдельные коллекции одновременно, используя традиционную алгоритм вместо того, чтобы пытаться одновременно сортировать одну коллекцию.

...