Java: распараллеливание быстрой сортировки через многопоточность - PullRequest
6 голосов
/ 06 августа 2010

Я экспериментирую с алгоритмами распараллеливания в Java.Я начал с сортировки слиянием и опубликовал свою попытку в этом вопросе .Моя пересмотренная попытка в приведенном ниже коде, где я сейчас пытаюсь распараллелить быструю сортировку.

Есть ли ошибки в моей многопоточной реализации или подходе к этой проблеме?Если нет, разве я не должен ожидать увеличения скорости более чем на 32% между последовательным и параллельным алгоритмом на дуэльном ядре (см. Время внизу)?

Вот алгоритм многопоточности:

    public class ThreadedQuick extends Thread
    {
        final int MAX_THREADS = Runtime.getRuntime().availableProcessors();

        CountDownLatch doneSignal;
        static int num_threads = 1;

        int[] my_array;
        int start, end;

        public ThreadedQuick(CountDownLatch doneSignal, int[] array, int start, int end) {
            this.my_array = array;
            this.start = start;
            this.end = end;
            this.doneSignal = doneSignal;
        }

        public static void reset() {
            num_threads = 1;
        }

        public void run() {
            quicksort(my_array, start, end);
            doneSignal.countDown();
            num_threads--;
        }

        public void quicksort(int[] array, int start, int end) {
            int len = end-start+1;

            if (len <= 1)
                return;

            int pivot_index = medianOfThree(array, start, end);
            int pivotValue = array[pivot_index];

            swap(array, pivot_index, end);

            int storeIndex = start;
            for (int i = start; i < end; i++) {
               if (array[i] <= pivotValue) {
                   swap(array, i, storeIndex);
                   storeIndex++;
               }
            }

            swap(array, storeIndex, end);

            if (num_threads < MAX_THREADS) {
                num_threads++;

                CountDownLatch completionSignal = new CountDownLatch(1);

                new ThreadedQuick(completionSignal, array, start, storeIndex - 1).start();
                quicksort(array, storeIndex + 1, end);

                try {
                    completionSignal.await(1000, TimeUnit.SECONDS);
                } catch(Exception ex) {
                    ex.printStackTrace();
                }
            } else {
                quicksort(array, start, storeIndex - 1);
                quicksort(array, storeIndex + 1, end);
            }
        }
    }

Вот как я его запускаю:

ThreadedQuick.reset();
CountDownLatch completionSignal = new CountDownLatch(1);
new ThreadedQuick(completionSignal, array, 0, array.length-1).start();
try {
    completionSignal.await(1000, TimeUnit.SECONDS);
} catch(Exception ex){
    ex.printStackTrace();
}

Я проверял это на Arrays.sort и аналогичных последовательных быстрыхалгоритм сортировки.Вот результаты синхронизации на ноутбуке Intel Duel-Core dell, в секундах:

Элементы: 500 000, последовательные: 0,068592, резьбовые: 0,046871, Arrays.sort: 0,079677

Элементы: 1 000 000,последовательно: 0,14416, с резьбой: 0,095492, Arrays.sort: 0,167155

Элементы: 2 000 000, последовательно: 0,301666, с резьбой: 0,205719, Arrays.sort: 0,350982

Элементы: 4 000 000, последовательно: 0,623291,с резьбой: 0,424119, Arrays.sort: 0,712698

Элементы: 8 000 000, последовательно: 1,279374, с резьбой: 0,859363, Arrays.sort: 1.487671

Каждое число выше - среднее время 100 тестов, бросокиз 3 самых низких и 3 самых высоких случаев.Я использовал Random.nextInt (Integer.MAX_VALUE) для генерации массива для каждого теста, который инициализировался один раз каждые 10 тестов с одним и тем же начальным числом.Каждый тест состоял из синхронизации данного алгоритма с System.nanoTime.Я округлил до шести десятичных знаков после усреднения.И, конечно же, я проверил, работает ли каждый сорт .

Как вы можете видеть, скорость между последовательными и многопоточными случаями увеличивается примерно на 32% в каждом наборе тестов.,Как я уже говорил выше, разве я не должен ожидать большего?

Ответы [ 3 ]

10 голосов
/ 08 августа 2010

Создание статического значения numThreads может вызвать проблемы, весьма вероятно, что в какой-то момент вы получите более MAX_THREADS.

Вероятно, причина того, что вы не получаете полного удвоения производительности, заключается в том, что ваша быстрая сортировка не может быть полностью распараллелена. Обратите внимание, что при первом вызове быстрой сортировки будет проходить весь массив в начальном потоке, прежде чем он начнет работать параллельно. Кроме того, при параллельном обработке отдельных потоков возникают трудности с параллелизацией алгоритма в виде переключения контекста и переключения режимов.

Посмотрите на инфраструктуру Fork / Join, эта проблема, вероятно, вполне уместна.

Пара моментов реализации. Реализуйте Runnable вместо расширения Thread. Расширение Thread следует использовать только при создании новой версии класса Thread. Когда вы просто хотите выполнить какую-то работу для параллельной работы, вам лучше использовать Runnable. В то же время, выполняя Runnable, вы также можете расширить другой класс, который дает вам больше гибкости в дизайне ОО. Используйте пул потоков, который ограничен количеством потоков, доступных в системе. Также не используйте numThreads, чтобы принять решение о том, следует ли создавать новый поток или нет. Вы можете рассчитать это заранее. Используйте минимальный размер раздела, который равен размеру всего массива, деленному на количество доступных процессоров. Что-то вроде:

public class ThreadedQuick implements Runnable {

    public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors();
    static final ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS);

    final int[] my_array;
    final int start, end;

    private final int minParitionSize;

    public ThreadedQuick(int minParitionSize, int[] array, int start, int end) {
        this.minParitionSize = minParitionSize;
        this.my_array = array;
        this.start = start;
        this.end = end;
    }

    public void run() {
        quicksort(my_array, start, end);
    }

    public void quicksort(int[] array, int start, int end) {
        int len = end - start + 1;

        if (len <= 1)
            return;

        int pivot_index = medianOfThree(array, start, end);
        int pivotValue = array[pivot_index];

        swap(array, pivot_index, end);

        int storeIndex = start;
        for (int i = start; i < end; i++) {
            if (array[i] <= pivotValue) {
                swap(array, i, storeIndex);
                storeIndex++;
            }
        }

        swap(array, storeIndex, end);

        if (len > minParitionSize) {

            ThreadedQuick quick = new ThreadedQuick(minParitionSize, array, start, storeIndex - 1);
            Future<?> future = executor.submit(quick);
            quicksort(array, storeIndex + 1, end);

            try {
                future.get(1000, TimeUnit.SECONDS);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        } else {
            quicksort(array, start, storeIndex - 1);
            quicksort(array, storeIndex + 1, end);
        }
    }    
}

Вы можете запустить его, выполнив:

ThreadedQuick quick = new ThreadedQuick(array / ThreadedQuick.MAX_THREADS, array, 0, array.length - 1);
quick.run();

Это запустит сортировку в том же потоке, что позволит избежать ненужного прыжка потока при запуске.

Предостережение: Не уверен, что приведенная выше реализация будет на самом деле быстрее, поскольку я не тестировал ее.

3 голосов
/ 12 августа 2011

Используется комбинация быстрой сортировки и сортировки слиянием.

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ParallelSortMain {
    public static void main(String... args) throws InterruptedException {
        Random rand = new Random();
        final int[] values = new int[100*1024*1024];
        for (int i = 0; i < values.length; i++)
            values[i] = rand.nextInt();

        int threads = Runtime.getRuntime().availableProcessors();
        ExecutorService es = Executors.newFixedThreadPool(threads);
        int blockSize = (values.length + threads - 1) / threads;
        for (int i = 0; i < values.length; i += blockSize) {
            final int min = i;
            final int max = Math.min(min + blockSize, values.length);
            es.submit(new Runnable() {
                @Override
                public void run() {
                    Arrays.sort(values, min, max);
                }
            });
        }
        es.shutdown();
        es.awaitTermination(10, TimeUnit.MINUTES);
        for (int blockSize2 = blockSize; blockSize2 < values.length / 2; blockSize2 *= 2) {
            for (int i = 0; i < values.length; i += blockSize2) {
                final int min = i;
                final int mid = Math.min(min + blockSize2, values.length);
                final int max = Math.min(min + blockSize2 * 2, values.length);
                mergeSort(values, min, mid, max);
            }
        }
    }

    private static boolean mergeSort(int[] values, int left, int mid, int end) {
        int[] results = new int[end - left];
        int l = left, r = mid, m = 0;
        for (; l < left && r < mid; m++) {
            int lv = values[l];
            int rv = values[r];
            if (lv < rv) {
                results[m] = lv;
                l++;
            } else {
                results[m] = rv;
                r++;
            }
        }
        while (l < mid)
            results[m++] = values[l++];
        while (r < end)
            results[m++] = values[r++];
        System.arraycopy(results, 0, values, left, results.length);
        return false;
    }
}
1 голос
/ 06 августа 2010

Пара комментариев, если я правильно понял ваш код:

  1. Я не вижу блокировки вокруг объекта numthreads, даже если к нему можно получить доступ через несколько потоков. Возможно, вам следует сделать это AtomicInteger.

  2. Используйте пул потоков и организуйте задачи, т. Е. Один вызов быстрой сортировки, чтобы выполнить продвижение пула потоков. Используйте Фьючерсы.

Ваш текущий метод деления вещей, как вы делаете, может оставить меньшее деление с потоком и большее деление без потока. Другими словами, он не назначает большие сегменты своим собственным потокам.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...