Не зная больше о реализации, вот мои предложения и / или комментарии:
Ограничение количества потоков, которые могут выполняться в любой момент времени. Пергапс8 или 10 (возможно, для того, чтобы дать планировщику больше свободы, хотя лучше всего поставить по одному на ядро / нить потока ).На самом деле нет смысла запускать дополнительные потоки для «пропускной способности» при проблеме с процессором, если сходство не поддерживает это.
Не создавайте потоков рядом суходит !!! только нить на больших ветках.Нет смысла создавать поток для сортировки относительно небольшого количества элементов, и на этом уровне есть много, много маленьких веток!Потоки добавили бы сюда намного больше относительных накладных расходов.(Это похоже на переключение в «простую сортировку» для листьев.)
Убедитесь, что каждый поток может работать изолированно - не должен давить на другойпоток во время работы -> без блокировок, просто дождитесь соединения .Разделяй и властвуй.
Возможно, посмотрите на выполнение подхода "в ширину" для порождения потоков.
Рассмотрим сортировку слиянием по быстрой сортировке (Я склонен к слиянию :-) Помните, что есть многочисленные различные виды слияний, включая восходящие.
Редактировать
- Убедитесь, что он действительно работает. Не забудьте правильно использовать барьеры памяти между потоками - требуется, даже если никакие два потока не изменяют одни и те же данные одновременно, чтобы обеспечить правильную видимость.
Редактировать (подтверждение концепции):
Я собрал эту простую демонстрацию.На моем Intel Core2 Duo @ 2Ghz я мог запустить его примерно в 2/3 - 3/4 раза, что определенно некоторое улучшение :) (Настройки: DATA_SIZE = 3000000, MAX_THREADS = 4, MIN_PARALLEL = 1000).Это с базовым кодом быстрой сортировки, извлеченным из Википедии, который не использует никаких других базовых оптимизаций.
Метод, в котором он определяет, может ли / должен ли быть запущен новый поток, также очень примитивен -если нет нового потока, он просто пускает в ход (потому что, знаете, зачем ждать?)
Этот код также должен (надеюсь) разветвляться с потоками.Это может быть менее эффективно для локальности данных, чем сохранение глубины, но модель казалась достаточно простой, если в моей голове.
Служба исполнителя также используется для упрощения проекта и возможности повторного использования того же самого.темы (против порождения новых тем).MIN_PARALLEL может стать довольно маленьким (скажем, около 20), прежде чем начнут отображаться служебные данные исполнителя - максимальное количество потоков и, вероятно, только использование-нового-потока-если-возможно, также контролирует это.
qsort average seconds: 0.6290541056
pqsort average seconds: 0.4513915392
Я не даю абсолютно никаких гарантий относительно полезности или правильности этого кода, но он, кажется, работает здесь.Обратите внимание на предупреждение рядом с ThreadPoolExecutor, поскольку оно ясно показывает, что я не совсем уверен в том, что происходит :-) Я вполне уверен, что в дизайне есть некоторые недостатки в недостаточном использовании потоков.
package psq;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
public class Main {
int[] genData (int len) {
Random r = new Random();
int[] newData = new int[len];
for (int i = 0; i < newData.length; i++) {
newData[i] = r.nextInt();
}
return newData;
}
boolean check (int[] arr) {
if (arr.length == 0) {
return true;
}
int lastValue = arr[0];
for (int i = 1; i < arr.length; i++) {
//System.out.println(arr[i]);
if (arr[i] < lastValue) {
return false;
}
lastValue = arr[i];
}
return true;
}
int partition (int[] arr, int left, int right, int pivotIndex) {
// pivotValue := array[pivotIndex]
int pivotValue = arr[pivotIndex];
{
// swap array[pivotIndex] and array[right] // Move pivot to end
int t = arr[pivotIndex];
arr[pivotIndex] = arr[right];
arr[right] = t;
}
// storeIndex := left
int storeIndex = left;
// for i from left to right - 1 // left ≤ i < right
for (int i = left; i < right; i++) {
//if array[i] ≤ pivotValue
if (arr[i] <= pivotValue) {
//swap array[i] and array[storeIndex]
//storeIndex := storeIndex + 1
int t = arr[i];
arr[i] = arr[storeIndex];
arr[storeIndex] = t;
storeIndex++;
}
}
{
// swap array[storeIndex] and array[right] // Move pivot to its final place
int t = arr[storeIndex];
arr[storeIndex] = arr[right];
arr[right] = t;
}
// return storeIndex
return storeIndex;
}
void quicksort (int[] arr, int left, int right) {
// if right > left
if (right > left) {
// select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
int pivotIndex = left + (right - left) / 2;
// pivotNewIndex := partition(array, left, right, pivotIndex)
int pivotNewIndex = partition(arr, left, right, pivotIndex);
// quicksort(array, left, pivotNewIndex - 1)
// quicksort(array, pivotNewIndex + 1, right)
quicksort(arr, left, pivotNewIndex - 1);
quicksort(arr, pivotNewIndex + 1, right);
}
}
static int DATA_SIZE = 3000000;
static int MAX_THREADS = 4;
static int MIN_PARALLEL = 1000;
// NOTE THAT THE THREAD POOL EXECUTER USES A LINKEDBLOCKINGQUEUE
// That is, because it's possible to OVER SUBMIT with this code,
// even with the semaphores!
ThreadPoolExecutor tp = new ThreadPoolExecutor(
MAX_THREADS,
MAX_THREADS,
Long.MAX_VALUE,
TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>());
// if there are no semaphore available then then we just continue
// processing from the same thread and "deal with it"
Semaphore sem = new Semaphore(MAX_THREADS, false);
class QuickSortAction implements Runnable {
int[] arr;
int left;
int right;
public QuickSortAction (int[] arr, int left, int right) {
this.arr = arr;
this.left = left;
this.right = right;
}
public void run () {
try {
//System.out.println(">>[" + left + "|" + right + "]");
pquicksort(arr, left, right);
//System.out.println("<<[" + left + "|" + right + "]");
} catch (Exception ex) {
// I got nothing for this
throw new RuntimeException(ex);
}
}
}
// pquicksort
// threads will [hopefully] fan-out "breadth-wise"
// this is because it's likely that the 2nd executer (if needed)
// will be submitted prior to the 1st running and starting its own executors
// of course this behavior is not terribly well-define
void pquicksort (int[] arr, int left, int right) throws ExecutionException, InterruptedException {
if (right > left) {
// memory barrier -- pquicksort is called from different threads
synchronized (arr) {}
int pivotIndex = left + (right - left) / 2;
int pivotNewIndex = partition(arr, left, right, pivotIndex);
Future<?> f1 = null;
Future<?> f2 = null;
if ((pivotNewIndex - 1) - left > MIN_PARALLEL) {
if (sem.tryAcquire()) {
f1 = tp.submit(new QuickSortAction(arr, left, pivotNewIndex - 1));
} else {
pquicksort(arr, left, pivotNewIndex - 1);
}
} else {
quicksort(arr, left, pivotNewIndex - 1);
}
if (right - (pivotNewIndex + 1) > MIN_PARALLEL) {
if (sem.tryAcquire()) {
f2 = tp.submit(new QuickSortAction(arr, pivotNewIndex + 1, right));
} else {
pquicksort(arr, pivotNewIndex + 1, right);
}
} else {
quicksort(arr, pivotNewIndex + 1, right);
}
// join back up
if (f1 != null) {
f1.get();
sem.release();
}
if (f2 != null) {
f2.get();
sem.release();
}
}
}
long qsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
quicksort(data, 0, data.length - 1);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("qsort not sorted!");
}
return duration;
}
long pqsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
pquicksort(data, 0, data.length - 1);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("pqsort not sorted!");
}
return duration;
}
public Main () throws Exception {
long qsort_duration = 0;
long pqsort_duration = 0;
int ITERATIONS = 10;
for (int i = 0; i < ITERATIONS; i++) {
System.out.println("Iteration# " + i);
int[] data = genData(DATA_SIZE);
if ((i & 1) == 0) {
qsort_duration += qsort_call(data);
pqsort_duration += pqsort_call(data);
} else {
pqsort_duration += pqsort_call(data);
qsort_duration += qsort_call(data);
}
}
System.out.println("====");
System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
}
public static void main(String[] args) throws Exception {
new Main();
}
}
YMMV.Удачного кодирования.
(Кроме того, я хотел бы знать, как этот - или аналогичный - код ярмарок на вашем 8-ядерном корпусе. Википедия утверждает, что линейное ускорение по количеству процессоров возможно :)
Редактировать (лучше цифры)
Убрано использование фьючерсов, которые вызывали незначительное "застревание" и переключались на один семафор окончательного ожидания: меньше бесполезного ожидания. Теперь выполняется всего за 55% времени без резьбы: -)
qsort average seconds: 0.5999702528
pqsort average seconds: 0.3346969088
(
package psq;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.*;
public class Main {
int[] genData (int len) {
Random r = new Random();
int[] newData = new int[len];
for (int i = 0; i < newData.length; i++) {
newData[i] = r.nextInt();
}
return newData;
}
boolean check (int[] arr) {
if (arr.length == 0) {
return true;
}
int lastValue = arr[0];
for (int i = 1; i < arr.length; i++) {
//System.out.println(arr[i]);
if (arr[i] < lastValue) {
return false;
}
lastValue = arr[i];
}
return true;
}
int partition (int[] arr, int left, int right, int pivotIndex) {
// pivotValue := array[pivotIndex]
int pivotValue = arr[pivotIndex];
{
// swap array[pivotIndex] and array[right] // Move pivot to end
int t = arr[pivotIndex];
arr[pivotIndex] = arr[right];
arr[right] = t;
}
// storeIndex := left
int storeIndex = left;
// for i from left to right - 1 // left ≤ i < right
for (int i = left; i < right; i++) {
//if array[i] ≤ pivotValue
if (arr[i] <= pivotValue) {
//swap array[i] and array[storeIndex]
//storeIndex := storeIndex + 1
int t = arr[i];
arr[i] = arr[storeIndex];
arr[storeIndex] = t;
storeIndex++;
}
}
{
// swap array[storeIndex] and array[right] // Move pivot to its final place
int t = arr[storeIndex];
arr[storeIndex] = arr[right];
arr[right] = t;
}
// return storeIndex
return storeIndex;
}
void quicksort (int[] arr, int left, int right) {
// if right > left
if (right > left) {
// select a pivot index //(e.g. pivotIndex := left + (right - left)/2)
int pivotIndex = left + (right - left) / 2;
// pivotNewIndex := partition(array, left, right, pivotIndex)
int pivotNewIndex = partition(arr, left, right, pivotIndex);
// quicksort(array, left, pivotNewIndex - 1)
// quicksort(array, pivotNewIndex + 1, right)
quicksort(arr, left, pivotNewIndex - 1);
quicksort(arr, pivotNewIndex + 1, right);
}
}
static int DATA_SIZE = 3000000;
static int MAX_EXTRA_THREADS = 7;
static int MIN_PARALLEL = 500;
// To get to reducePermits
@SuppressWarnings("serial")
class Semaphore2 extends Semaphore {
public Semaphore2(int permits, boolean fair) {
super(permits, fair);
}
public void removePermit() {
super.reducePermits(1);
}
}
class QuickSortAction implements Runnable {
final int[] arr;
final int left;
final int right;
final SortState ss;
public QuickSortAction (int[] arr, int left, int right, SortState ss) {
this.arr = arr;
this.left = left;
this.right = right;
this.ss = ss;
}
public void run () {
try {
//System.out.println(">>[" + left + "|" + right + "]");
pquicksort(arr, left, right, ss);
//System.out.println("<<[" + left + "|" + right + "]");
ss.limit.release();
ss.countdown.release();
} catch (Exception ex) {
// I got nothing for this
throw new RuntimeException(ex);
}
}
}
class SortState {
final public ThreadPoolExecutor pool = new ThreadPoolExecutor(
MAX_EXTRA_THREADS,
MAX_EXTRA_THREADS,
Long.MAX_VALUE,
TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>());
// actual limit: executor may actually still have "active" things to process
final public Semaphore limit = new Semaphore(MAX_EXTRA_THREADS, false);
final public Semaphore2 countdown = new Semaphore2(1, false);
}
void pquicksort (int[] arr) throws Exception {
SortState ss = new SortState();
pquicksort(arr, 0, arr.length - 1, ss);
ss.countdown.acquire();
}
// pquicksort
// threads "fork" if available.
void pquicksort (int[] arr, int left, int right, SortState ss) throws ExecutionException, InterruptedException {
if (right > left) {
// memory barrier -- pquicksort is called from different threads
// and those threads may be created because they are in an executor
synchronized (arr) {}
int pivotIndex = left + (right - left) / 2;
int pivotNewIndex = partition(arr, left, right, pivotIndex);
{
int newRight = pivotNewIndex - 1;
if (newRight - left > MIN_PARALLEL) {
if (ss.limit.tryAcquire()) {
ss.countdown.removePermit();
ss.pool.submit(new QuickSortAction(arr, left, newRight, ss));
} else {
pquicksort(arr, left, newRight, ss);
}
} else {
quicksort(arr, left, newRight);
}
}
{
int newLeft = pivotNewIndex + 1;
if (right - newLeft > MIN_PARALLEL) {
if (ss.limit.tryAcquire()) {
ss.countdown.removePermit();
ss.pool.submit(new QuickSortAction(arr, newLeft, right, ss));
} else {
pquicksort(arr, newLeft, right, ss);
}
} else {
quicksort(arr, newLeft, right);
}
}
}
}
long qsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
quicksort(data, 0, data.length - 1);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("qsort not sorted!");
}
return duration;
}
long pqsort_call (int[] origData) throws Exception {
int[] data = Arrays.copyOf(origData, origData.length);
long start = System.nanoTime();
pquicksort(data);
long duration = System.nanoTime() - start;
if (!check(data)) {
throw new Exception("pqsort not sorted!");
}
return duration;
}
public Main () throws Exception {
long qsort_duration = 0;
long pqsort_duration = 0;
int ITERATIONS = 10;
for (int i = 0; i < ITERATIONS; i++) {
System.out.println("Iteration# " + i);
int[] data = genData(DATA_SIZE);
if ((i & 1) == 0) {
qsort_duration += qsort_call(data);
pqsort_duration += pqsort_call(data);
} else {
pqsort_duration += pqsort_call(data);
qsort_duration += qsort_call(data);
}
}
System.out.println("====");
System.out.println("qsort average seconds: " + (float)qsort_duration / (ITERATIONS * 1E9));
System.out.println("pqsort average seconds: " + (float)pqsort_duration / (ITERATIONS * 1E9));
}
public static void main(String[] args) throws Exception {
new Main();
}
}