Распараллеливание быстрой сортировки замедляет - PullRequest
5 голосов
/ 04 февраля 2011

Я выполняю быструю сортировку по очень большому количеству данных и для развлечения пытаюсь распараллелить их, чтобы ускорить сортировку. Однако в текущей форме многопоточная версия медленнее однопоточной версии из-за удушающих точек синхронизации.
Каждый раз, когда я порождаю поток, я получаю блокировку для int и увеличиваю ее, и каждый раз, когда поток завершается, я снова получаю блокировку и декремент, в дополнение к проверке, есть ли еще какие-либо выполняемые потоки (int> 0). Если нет, я просыпаюсь в главном потоке и работаю с отсортированными данными.

Я уверен, что есть лучший способ сделать это. Не уверен, что это все же. Помощь очень ценится.

EDIT: Думаю, я не предоставил достаточно информации.
Это Java-код на окто-ядре Opteron. Я не могу переключать языки.
Объем, который я сортирую, помещается в память, и он уже существует в памяти в момент вызова быстрой сортировки, поэтому нет смысла записывать его на диск только для чтения обратно в память.
Под «получить блокировку» я подразумеваю наличие синхронизированного блока целого числа.

Ответы [ 3 ]

8 голосов
/ 04 февраля 2011

Не зная больше о реализации, вот мои предложения и / или комментарии:

  1. Ограничение количества потоков, которые могут выполняться в любой момент времени. Пергапс8 или 10 (возможно, для того, чтобы дать планировщику больше свободы, хотя лучше всего поставить по одному на ядро ​​/ нить потока ).На самом деле нет смысла запускать дополнительные потоки для «пропускной способности» при проблеме с процессором, если сходство не поддерживает это.

  2. Не создавайте потоков рядом суходит !!! только нить на больших ветках.Нет смысла создавать поток для сортировки относительно небольшого количества элементов, и на этом уровне есть много, много маленьких веток!Потоки добавили бы сюда намного больше относительных накладных расходов.(Это похоже на переключение в «простую сортировку» для листьев.)

  3. Убедитесь, что каждый поток может работать изолированно - не должен давить на другойпоток во время работы -> без блокировок, просто дождитесь соединения .Разделяй и властвуй.

  4. Возможно, посмотрите на выполнение подхода "в ширину" для порождения потоков.

  5. Рассмотрим сортировку слиянием по быстрой сортировке (Я склонен к слиянию :-) Помните, что есть многочисленные различные виды слияний, включая восходящие.

Редактировать

  1. Убедитесь, что он действительно работает. Не забудьте правильно использовать барьеры памяти между потоками - требуется, даже если никакие два потока не изменяют одни и те же данные одновременно, чтобы обеспечить правильную видимость.

Редактировать (подтверждение концепции):

Я собрал эту простую демонстрацию.На моем Intel Core2 Duo @ 2Ghz я мог запустить его примерно в 2/3 - 3/4 раза, что определенно некоторое улучшение :) (Настройки: DATA_SIZE = 3000000, MAX_THREADS = 4, MIN_PARALLEL = 1000).Это с базовым кодом быстрой сортировки, извлеченным из Википедии, который не использует никаких других базовых оптимизаций.

Метод, в котором он определяет, может ли / должен ли быть запущен новый поток, также очень примитивен -если нет нового потока, он просто пускает в ход (потому что, знаете, зачем ждать?)

Этот код также должен (надеюсь) разветвляться с потоками.Это может быть менее эффективно для локальности данных, чем сохранение глубины, но модель казалась достаточно простой, если в моей голове.

Служба исполнителя также используется для упрощения проекта и возможности повторного использования того же самого.темы (против порождения новых тем).MIN_PARALLEL может стать довольно маленьким (скажем, около 20), прежде чем начнут отображаться служебные данные исполнителя - максимальное количество потоков и, вероятно, только использование-нового-потока-если-возможно, также контролирует это.

qsort average seconds: 0.6290541056
pqsort average seconds: 0.4513915392

Я не даю абсолютно никаких гарантий относительно полезности или правильности этого кода, но он, кажется, работает здесь.Обратите внимание на предупреждение рядом с ThreadPoolExecutor, поскольку оно ясно показывает, что я не совсем уверен в том, что происходит :-) Я вполне уверен, что в дизайне есть некоторые недостатки в недостаточном использовании потоков.

package psq;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

public class Main {

    int[] genData (int len) {
        Random r = new Random();
        int[] newData = new int[len];
        for (int i = 0; i < newData.length; i++) {
            newData[i] = r.nextInt();
        }
        return newData;
    }      

    boolean check (int[] arr) {
        if (arr.length == 0) {
            return true;
        }
        int lastValue = arr[0];
        for (int i = 1; i < arr.length; i++) {
            //System.out.println(arr[i]);
            if (arr[i] < lastValue) {
                return false;
            }
            lastValue = arr[i];
        }
        return true;
    }

    int partition (int[] arr, int left, int right, int pivotIndex) {
        // pivotValue := array[pivotIndex]
        int pivotValue = arr[pivotIndex];
        {
            // swap array[pivotIndex] and array[right] // Move pivot to end
            int t = arr[pivotIndex];
            arr[pivotIndex] = arr[right];
            arr[right] = t;
        }
        // storeIndex := left
        int storeIndex = left;
        // for i  from  left to right - 1 // left ≤ i < right
        for (int i = left; i < right; i++) {
            //if array[i] ≤ pivotValue
            if (arr[i] <= pivotValue) {
                //swap array[i] and array[storeIndex]
                //storeIndex := storeIndex + 1            
                int t = arr[i];
                arr[i] = arr[storeIndex];
                arr[storeIndex] = t;
                storeIndex++;                   
            }
        }
        {
            // swap array[storeIndex] and array[right] // Move pivot to its final place
            int t = arr[storeIndex];
            arr[storeIndex] = arr[right];
            arr[right] = t;
        }
        // return storeIndex
        return storeIndex;
    }

    void quicksort (int[] arr, int left, int right) {
        // if right > left
        if (right > left) {            
            // select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
            int pivotIndex = left + (right - left) / 2;
            // pivotNewIndex := partition(array, left, right, pivotIndex)
            int pivotNewIndex = partition(arr, left, right, pivotIndex);
            // quicksort(array, left, pivotNewIndex - 1)
            // quicksort(array, pivotNewIndex + 1, right)
            quicksort(arr, left, pivotNewIndex - 1);
            quicksort(arr, pivotNewIndex + 1, right);
        }
    }

    static int DATA_SIZE = 3000000;
    static int MAX_THREADS = 4;
    static int MIN_PARALLEL = 1000;

    // NOTE THAT THE THREAD POOL EXECUTER USES A LINKEDBLOCKINGQUEUE
    // That is, because it's possible to OVER SUBMIT with this code,
    // even with the semaphores!
    ThreadPoolExecutor tp = new ThreadPoolExecutor(
            MAX_THREADS,
            MAX_THREADS,
            Long.MAX_VALUE,
            TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());
    // if there are no semaphore available then then we just continue
    // processing from the same thread and "deal with it"
    Semaphore sem = new Semaphore(MAX_THREADS, false); 

    class QuickSortAction implements Runnable {
        int[] arr;
        int left;
        int right;

        public QuickSortAction (int[] arr, int left, int right) {
            this.arr = arr;
            this.left = left;
            this.right = right;
        }

        public void run () {
            try {
                //System.out.println(">>[" + left + "|" + right + "]");
                pquicksort(arr, left, right);
                //System.out.println("<<[" + left + "|" + right + "]");
            } catch (Exception ex) {
                // I got nothing for this
                throw new RuntimeException(ex); 
            }
        }

    }

    // pquicksort
    // threads will [hopefully] fan-out "breadth-wise"
    // this is because it's likely that the 2nd executer (if needed)
    // will be submitted prior to the 1st running and starting its own executors
    // of course this behavior is not terribly well-define
    void pquicksort (int[] arr, int left, int right) throws ExecutionException, InterruptedException {
        if (right > left) {
            // memory barrier -- pquicksort is called from different threads
            synchronized (arr) {}

            int pivotIndex = left + (right - left) / 2;
            int pivotNewIndex = partition(arr, left, right, pivotIndex);

            Future<?> f1 = null;
            Future<?> f2 = null;

            if ((pivotNewIndex - 1) - left > MIN_PARALLEL) {
                if (sem.tryAcquire()) {
                    f1 = tp.submit(new QuickSortAction(arr, left, pivotNewIndex - 1));
                } else {
                    pquicksort(arr, left, pivotNewIndex - 1);
                }
            } else {
                quicksort(arr, left, pivotNewIndex - 1);
            }
            if (right - (pivotNewIndex + 1) > MIN_PARALLEL) {
                if (sem.tryAcquire()) {
                    f2 = tp.submit(new QuickSortAction(arr, pivotNewIndex + 1, right));
                } else {
                    pquicksort(arr, pivotNewIndex + 1, right);
                }
            } else {
                quicksort(arr, pivotNewIndex + 1, right);
            }

            // join back up
            if (f1 != null) {
                f1.get();
                sem.release();
            }
            if (f2 != null) {
                f2.get();
                sem.release();
            }
        }        
    }

    long qsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        quicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("qsort not sorted!");
        }
        return duration;
    }

    long pqsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        pquicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("pqsort not sorted!");
        }
        return duration;
    }

    public Main () throws Exception {
        long qsort_duration = 0;
        long pqsort_duration = 0;
        int ITERATIONS = 10;
        for (int i = 0; i < ITERATIONS; i++) {
            System.out.println("Iteration# " + i);
            int[] data = genData(DATA_SIZE);
            if ((i & 1) == 0) {
                qsort_duration += qsort_call(data);
                pqsort_duration += pqsort_call(data);
            } else {
                pqsort_duration += pqsort_call(data);
                qsort_duration += qsort_call(data);
            }
        }
        System.out.println("====");
        System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
        System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
    }

    public static void main(String[] args) throws Exception {
        new Main();
    }

}

YMMV.Удачного кодирования.

(Кроме того, я хотел бы знать, как этот - или аналогичный - код ярмарок на вашем 8-ядерном корпусе. Википедия утверждает, что линейное ускорение по количеству процессоров возможно :)

Редактировать (лучше цифры)

Убрано использование фьючерсов, которые вызывали незначительное "застревание" и переключались на один семафор окончательного ожидания: меньше бесполезного ожидания. Теперь выполняется всего за 55% времени без резьбы: -)

qsort average seconds: 0.5999702528
pqsort average seconds: 0.3346969088

(

package psq;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;

public class Main {

    int[] genData (int len) {
        Random r = new Random();
        int[] newData = new int[len];
        for (int i = 0; i < newData.length; i++) {
            newData[i] = r.nextInt();
        }
        return newData;
    }      

    boolean check (int[] arr) {
        if (arr.length == 0) {
            return true;
        }
        int lastValue = arr[0];
        for (int i = 1; i < arr.length; i++) {
            //System.out.println(arr[i]);
            if (arr[i] < lastValue) {
                return false;
            }
            lastValue = arr[i];
        }
        return true;
    }

    int partition (int[] arr, int left, int right, int pivotIndex) {
        // pivotValue := array[pivotIndex]
        int pivotValue = arr[pivotIndex];
        {
            // swap array[pivotIndex] and array[right] // Move pivot to end
            int t = arr[pivotIndex];
            arr[pivotIndex] = arr[right];
            arr[right] = t;
        }
        // storeIndex := left
        int storeIndex = left;
        // for i  from  left to right - 1 // left ≤ i < right
        for (int i = left; i < right; i++) {
            //if array[i] ≤ pivotValue
            if (arr[i] <= pivotValue) {
                //swap array[i] and array[storeIndex]
                //storeIndex := storeIndex + 1            
                int t = arr[i];
                arr[i] = arr[storeIndex];
                arr[storeIndex] = t;
                storeIndex++;                   
            }
        }
        {
            // swap array[storeIndex] and array[right] // Move pivot to its final place
            int t = arr[storeIndex];
            arr[storeIndex] = arr[right];
            arr[right] = t;
        }
        // return storeIndex
        return storeIndex;
    }

    void quicksort (int[] arr, int left, int right) {
        // if right > left
        if (right > left) {            
            // select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
            int pivotIndex = left + (right - left) / 2;
            // pivotNewIndex := partition(array, left, right, pivotIndex)
            int pivotNewIndex = partition(arr, left, right, pivotIndex);
            // quicksort(array, left, pivotNewIndex - 1)
            // quicksort(array, pivotNewIndex + 1, right)
            quicksort(arr, left, pivotNewIndex - 1);
            quicksort(arr, pivotNewIndex + 1, right);
        }
    }

    static int DATA_SIZE = 3000000;
    static int MAX_EXTRA_THREADS = 7;
    static int MIN_PARALLEL = 500;

    // To get to reducePermits
    @SuppressWarnings("serial")
    class Semaphore2 extends Semaphore {
        public Semaphore2(int permits, boolean fair) {
            super(permits, fair);
        }
        public void removePermit() {
            super.reducePermits(1);
        }
    }

    class QuickSortAction implements Runnable {
        final int[] arr;
        final int left;
        final int right;
        final SortState ss;

        public QuickSortAction (int[] arr, int left, int right, SortState ss) {
            this.arr = arr;
            this.left = left;
            this.right = right;
            this.ss = ss;
        }

        public void run () {
            try {
                //System.out.println(">>[" + left + "|" + right + "]");
                pquicksort(arr, left, right, ss);
                //System.out.println("<<[" + left + "|" + right + "]");
                ss.limit.release();
                ss.countdown.release();
            } catch (Exception ex) {
                // I got nothing for this
                throw new RuntimeException(ex); 
            }
        }

    }

    class SortState {
        final public ThreadPoolExecutor pool = new ThreadPoolExecutor(
            MAX_EXTRA_THREADS,
            MAX_EXTRA_THREADS,
            Long.MAX_VALUE,
            TimeUnit.NANOSECONDS,
            new LinkedBlockingQueue<Runnable>());
        // actual limit: executor may actually still have "active" things to process
        final public Semaphore limit = new Semaphore(MAX_EXTRA_THREADS, false); 
        final public Semaphore2 countdown = new Semaphore2(1, false); 
    }

    void pquicksort (int[] arr) throws Exception {
        SortState ss = new SortState();
        pquicksort(arr, 0, arr.length - 1, ss);
        ss.countdown.acquire();
    }

    // pquicksort
    // threads "fork" if available.
    void pquicksort (int[] arr, int left, int right, SortState ss) throws ExecutionException, InterruptedException {
        if (right > left) {
            // memory barrier -- pquicksort is called from different threads
            // and those threads may be created because they are in an executor
            synchronized (arr) {}

            int pivotIndex = left + (right - left) / 2;
            int pivotNewIndex = partition(arr, left, right, pivotIndex);

            {
                int newRight = pivotNewIndex - 1;
                if (newRight - left > MIN_PARALLEL) {
                    if (ss.limit.tryAcquire()) {
                        ss.countdown.removePermit();
                        ss.pool.submit(new QuickSortAction(arr, left, newRight, ss));
                    } else {
                        pquicksort(arr, left, newRight, ss);
                    }
                } else {
                    quicksort(arr, left, newRight);
                }
            }

            {
                int newLeft = pivotNewIndex + 1;
                if (right - newLeft > MIN_PARALLEL) {
                    if (ss.limit.tryAcquire()) {
                        ss.countdown.removePermit();
                        ss.pool.submit(new QuickSortAction(arr, newLeft, right, ss));
                    } else {
                        pquicksort(arr, newLeft, right, ss);
                    }
                } else {
                    quicksort(arr, newLeft, right);
                }
            }

        }        
    }

    long qsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        quicksort(data, 0, data.length - 1);
        long duration = System.nanoTime() - start;
        if (!check(data)) {
            throw new Exception("qsort not sorted!");
        }
        return duration;
    }

    long pqsort_call (int[] origData) throws Exception {
        int[] data = Arrays.copyOf(origData, origData.length);
        long start = System.nanoTime();
        pquicksort(data);
        long duration = System.nanoTime() - start;
        if (!check(data)) {            
            throw new Exception("pqsort not sorted!");
        }
        return duration;
    }

    public Main () throws Exception {
        long qsort_duration = 0;
        long pqsort_duration = 0;
        int ITERATIONS = 10;
        for (int i = 0; i < ITERATIONS; i++) {
            System.out.println("Iteration# " + i);
            int[] data = genData(DATA_SIZE);
            if ((i & 1) == 0) {
                qsort_duration += qsort_call(data);
                pqsort_duration += pqsort_call(data);
            } else {
                pqsort_duration += pqsort_call(data);
                qsort_duration += qsort_call(data);
            }
        }
        System.out.println("====");
        System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
        System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
    }

    public static void main(String[] args) throws Exception {
        new Main();
    }

}
1 голос
/ 04 февраля 2011

Под «получить блокировку» я подразумеваю наличие синхронизированного блока в целом числе.Если я вас правильно понимаю: вы блокируете каждый элемент, который вы на самом деле сортируете, что звучит как ОЧЕНЬ медленно!

Похоже, вы создаете слишком много потоков ... вымы не сказали нам, сколько потоков вы на самом деле порождаете, но если вы делаете один поток на одно целое число, то это почти наверняка будет медленнее (где почти наверняка это занижение).То, что вы хотели бы сделать, это порождать 8 потоков, так как у вас есть 8 ядер, и «разбить» ваш массив на 8 секций, которые вы будете сортировать по-быстрому, а затем объединять так же, как в оригинальном алгоритме.Вот несколько примеров того, как этого добиться: Многопоточная быстрая сортировка или слияние

1 голос
/ 04 февраля 2011

Нитки дорогие. Не используйте потоки, если у вас нет тонны данных для сортировки. Или вы можете использовать язык, который имеет лучший дизайн для параллелизма. Например. Erlang имеет очень легкие нити, которые могут быть полезны для сортировки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...