Можно ли использовать многопоточность без создания потоков снова и снова? - PullRequest
3 голосов
/ 15 февраля 2011

Сначала и еще раз, спасибо всем, кто уже ответил на мой вопрос. Я не очень опытный программист, и это мой первый опыт работы с многопоточностью.

У меня есть пример, который работает так же, как моя проблема. Я надеюсь, что это может облегчить наше дело здесь.

public class ThreadMeasuring {
private static final int TASK_TIME = 1; //microseconds
private static class Batch implements Runnable {
    CountDownLatch countDown;
    public Batch(CountDownLatch countDown) {
        this.countDown = countDown;
    }

    @Override
    public void run() {         
        long t0 =System.nanoTime();
        long t = 0;
        while(t<TASK_TIME*1e6){ t = System.nanoTime() - t0; }

        if(countDown!=null) countDown.countDown();
    }
}

public static void main(String[] args) {
    ThreadFactory threadFactory = new ThreadFactory() {
        int counter = 1;
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "Executor thread " + (counter++));
            return t;
        }
    };

  // the total duty to be divided in tasks is fixed (problem dependent). 
  // Increase ntasks will mean decrease the task time proportionally. 
  // 4 Is an arbitrary example.
  // This tasks will be executed thousands of times, inside a loop alternating 
  // with serial processing that needs their result and prepare the next ones.
    int ntasks = 4; 
    int nthreads = 2;
    int ncores = Runtime.getRuntime().availableProcessors();
    if (nthreads<ncores) ncores = nthreads;     

    Batch serial = new Batch(null);
    long serialTime = System.nanoTime();
    serial.run();
    serialTime = System.nanoTime() - serialTime;

    ExecutorService executor = Executors.newFixedThreadPool( nthreads, threadFactory );
    CountDownLatch countDown = new CountDownLatch(ntasks);

    ArrayList<Batch> batches = new ArrayList<Batch>();
    for (int i = 0; i < ntasks; i++) {
        batches.add(new Batch(countDown));
    }

    long start = System.nanoTime();
    for (Batch r : batches){
        executor.execute(r);
    }

    // wait for all threads to finish their task
    try {
        countDown.await();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    long tmeasured = (System.nanoTime() - start);

    System.out.println("Task time= " + TASK_TIME + " ms");
    System.out.println("Number of tasks= " + ntasks);
    System.out.println("Number of threads= " + nthreads);
    System.out.println("Number of cores= " + ncores);
    System.out.println("Measured time= " + tmeasured);
    System.out.println("Theoretical serial time= " + TASK_TIME*1000000*ntasks);
    System.out.println("Theoretical parallel time= " + (TASK_TIME*1000000*ntasks)/ncores);
    System.out.println("Speedup= " + (serialTime*ntasks)/(double)tmeasured);

    executor.shutdown();
}
 }

Вместо выполнения вычислений каждая партия просто ожидает некоторое время. Программа рассчитывает ускорение , которое теоретически всегда будет равно 2, но может быть меньше 1 (на самом деле снижение ), если значение TASK_TIME мало.

Мои вычисления занимают 1 мс и обычно быстрее. В течение 1 мс я нахожу небольшое ускорение примерно на 30%, но на практике с моей программой я замечаю, что снижение скорости .

Структура этого кода очень похожа на мою программу, поэтому, если бы вы могли помочь мне оптимизировать обработку потоков, я был бы очень признателен.

С уважением.

Ниже, оригинальный вопрос:

Привет.

Я бы хотел использовать многопоточность в моей программе, так как я считаю, что она может значительно повысить ее эффективность. Большая часть времени его работы обусловлена ​​независимыми расчетами.

Моя программа имеет тысячи независимых вычислений (несколько линейных систем для решения), но они просто происходят одновременно небольшими группами из десятков или около того. Каждой из этих групп потребуется несколько миллисекунд для запуска. После одной из этих групп вычислений программа должна некоторое время запускаться последовательно, а затем мне снова приходится решать линейные системы.

На самом деле, можно видеть, что эти независимые линейные системы, которые нужно решить, находятся внутри цикла, который повторяется тысячи раз, чередуясь с последовательными вычислениями, которые зависят от предыдущих результатов. Моя идея, чтобы ускорить программу, состоит в том, чтобы вычислять эти независимые вычисления в параллельных потоках, разделяя каждую группу (количество процессоров, которые у меня есть) на партии независимых вычислений. Так что, в принципе, очереди вообще нет.

Я пытался использовать FixedThreadPool и CachedThreadPool, и это получалось даже медленнее, чем последовательная обработка. Кажется, что создание новых протекторов занимает слишком много времени каждый раз, когда мне нужно решить партии.

Есть ли лучший способ справиться с этой проблемой? Эти пулы, которые я использовал, кажутся подходящими для случаев, когда каждый поток занимает больше времени вместо тысяч меньших потоков ...

Спасибо! С наилучшими пожеланиями!

Ответы [ 6 ]

5 голосов
/ 15 февраля 2011

Пулы потоков не создают новые потоки снова и снова.Вот почему это пулы.

Сколько потоков вы использовали и сколько у вас процессоров / ядер?Какова нагрузка на систему (обычно, когда вы выполняете их последовательно и когда вы выполняете с пулом)?Включена ли синхронизация или какой-либо другой тип блокировки?

Является ли алгоритм параллельного выполнения точно таким же, как и последовательный (ваше описание, похоже, предполагает, что в последовательном интерфейсе использовались некоторые результаты предыдущей итерации).

1 голос
/ 16 февраля 2011

Из того, что я прочитал: «тысячи независимых вычислений ... происходят одновременно ... для запуска потребуются несколько миллисекунд», мне кажется, что ваша проблема идеально подходит для программирования на GPU.

И я думаю, что это отвечает на ваш вопрос.Программирование на GPU становится все более популярным.Есть привязки Java для CUDA и OpenCL.Если это возможно для вас, я говорю, пойти на это.

1 голос
/ 15 февраля 2011

Если у вас есть проблема, которая не масштабируется до нескольких ядер, вам нужно изменить программу, или у вас есть проблема, которая не так параллельна, как вы думаете.Я подозреваю, что у вас есть какой-то другой тип ошибки, но я не могу сказать, основываясь на предоставленной информации.

Этот тестовый код может помочь.

Time per million tasks 765 ms

код

ExecutorService es = Executors.newFixedThreadPool(4);
Runnable task = new Runnable() {
    @Override
    public void run() {
        // do nothing.
    }
};
long start = System.nanoTime();
for(int i=0;i<1000*1000;i++) {
    es.submit(task);
}
es.shutdown();
es.awaitTermination(10, TimeUnit.SECONDS);
long time = System.nanoTime() - start;
System.out.println("Time per million tasks "+time/1000/1000+" ms");

РЕДАКТИРОВАТЬ: скажем, у вас есть цикл, который последовательно делает это.

for(int i=0;i<1000*1000;i++)
    doWork(i);

Можно предположить, что переход к такому циклу будет быстрее, но проблема в том, что накладные расходы могут быть больше, чем усиление.

for(int i=0;i<1000*1000;i++) {
    final int i2 = i;
    ex.execute(new Runnable() {
        public void run() {
            doWork(i2);
        }
    }
}

Таким образом, вам нужно создать пакеты работы (по крайней мере, одинна поток), поэтому существует достаточно задач, чтобы сохранить занятость всех потоков, но не так много задач, которые ваши потоки тратят на накладные расходы.

final int batchSize = 10*1000;
for(int i=0;i<1000*1000;i+=batchSize) {
    final int i2 = i;
    ex.execute(new Runnable() {
        public void run() {
            for(int i3=i2;i3<i2+batchSize;i3++)
               doWork(i3);
        }
    }
}

РЕДАКТИРОВАТЬ 2: Выполнение теста, копирующего данные между потоками.

for (int i = 0; i < 20; i++) {
    ExecutorService es = Executors.newFixedThreadPool(1);
    final double[] d = new double[4 * 1024];
    Arrays.fill(d, 1);
    final double[] d2 = new double[4 * 1024];
    es.submit(new Runnable() {
        @Override
        public void run() {
            // nothing.
        }
    }).get();
    long start = System.nanoTime();
    es.submit(new Runnable() {
        @Override
        public void run() {
            synchronized (d) {
                System.arraycopy(d, 0, d2, 0, d.length);
            }
        }
    });
    es.shutdown();
    es.awaitTermination(10, TimeUnit.SECONDS);
    // get a the values in d2.
    for (double x : d2) ;
    long time = System.nanoTime() - start;
    System.out.printf("Time to pass %,d doubles to another thread and back was %,d ns.%n", d.length, time);
}

начинается плохо, но нагревается до ~ 50 у нас.

Time to pass 4,096 doubles to another thread and back was 1,098,045 ns.
Time to pass 4,096 doubles to another thread and back was 171,949 ns.
 ... deleted ...
Time to pass 4,096 doubles to another thread and back was 50,566 ns.
Time to pass 4,096 doubles to another thread and back was 49,937 ns.
1 голос
/ 15 февраля 2011

Я не уверен, как вы выполняете расчеты, но если вы разбиваете их на небольшие группы, тогда ваше приложение может быть готово для шаблона «Производитель / Потребитель».

Кроме того, вы можетезаинтересованы в использовании BlockingQueue .Потребители вычислений будут блокироваться до тех пор, пока что-то не окажется в очереди, и блокировка не произойдет при вызове take().

private static class Batch implements Runnable {
    CountDownLatch countDown;
    public Batch(CountDownLatch countDown) {
        this.countDown = countDown;
    }

    CountDownLatch getLatch(){
        return countDown;
    }

    @Override
    public void run() {         
        long t0 =System.nanoTime();
        long t = 0;
        while(t<TASK_TIME*1e6){ t = System.nanoTime() - t0; }

        if(countDown!=null) countDown.countDown();
    }
}

class CalcProducer implements Runnable {
    private final BlockingQueue queue;
    CalcProducer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while(true) { 
                CountDownLatch latch = new CountDownLatch(ntasks);
                for(int i = 0; i < ntasks; i++) {
                    queue.put(produce(latch)); 
                }
                // don't need to wait for the latch, only consumers wait
            }
        } catch (InterruptedException ex) { ... handle ...}
    }

    CalcGroup produce(CountDownLatch latch) {
        return new Batch(latch);
    }
}

class CalcConsumer implements Runnable {
    private final BlockingQueue queue;

    CalcConsumer(BlockingQueue q) { queue = q; }

    public void run() {
        try {
            while(true) { consume(queue.take()); }
        } catch (InterruptedException ex) { ... handle ...}
    }

    void consume(Batch batch) { 
        batch.Run();
        batch.getLatch().await();
    }
}

class Setup {
    void main() {
        BlockingQueue<Batch> q = new LinkedBlockingQueue<Batch>();
        int numConsumers = 4;

        CalcProducer p = new CalcProducer(q);
        Thread producerThread = new Thread(p);
        producerThread.start();

        Thread[] consumerThreads = new Thread[numConsumers];

        for(int i = 0; i < numConsumers; i++)
        {
            consumerThreads[i] = new Thread(new CalcConsumer(q));
            consumerThreads[i].start();
        }
    }
}

Извините, если есть какие-либо синтаксические ошибки, я жаловался на код C # ииногда я забываю правильный синтаксис Java, но общая идея есть.

0 голосов
/ 15 февраля 2011

Хм, CachedThreadPool, кажется, создано только для вашего случая.Он не воссоздает потоки, если вы используете их достаточно быстро, и если вы тратите целую минуту, прежде чем использовать новый поток, издержки на создание потоков сравнительно незначительны.

Но вы не можете ожидать, что параллельное выполнение ускоритсяваши расчеты, если вы не можете также получить доступ к данным параллельно.Если вы используете обширную блокировку, много синхронизированных методов и т. Д., Вы потратите больше на издержки, чем на параллельную обработку.Убедитесь, что ваши данные могут эффективно обрабатываться параллельно, и что у вас нет неочевидных синхронизаций lurkinb в коде.

Кроме того, процессоры эффективно обрабатывают данные, если данные полностью помещаются в кэш.Если наборы данных каждого потока превышают половину кэша, два потока будут конкурировать за кэш и выполнять много операций чтения из ОЗУ, в то время как один поток, если использует только одно ядро, может работать лучше, поскольку он избегает чтения из ОЗУ в узком цикле, который он выполняет.Проверьте это тоже.

0 голосов
/ 15 февраля 2011

Вот набросок псевдо того, о чем я думаю

class WorkerThread extends Thread {

    Queue<Calculation> calcs;
    MainCalculator mainCalc;

    public void run() {
        while(true) {
            while(calcs.isEmpty()) sleep(500); // busy waiting? Context switching probably won't be so bad.
            Calculation calc = calcs.pop(); // is it pop to get and remove? you'll have to look
            CalculationResult result = calc.calc();
            mainCalc.returnResultFor(calc,result);      
        }
    }


}

Другой вариант, если вы вызываете внешние программы.Не помещайте их в цикл, который выполняет их по одному, иначе они не будут работать параллельно.Вы можете поместить их в цикл, который ОБРАБАТЫВАЕТ их по одному, но не выполняет их по одному.

Process calc1 = Runtime.getRuntime.exec("myCalc paramA1 paramA2 paramA3");
Process calc2 = Runtime.getRuntime.exec("myCalc paramB1 paramB2 paramB3");
Process calc3 = Runtime.getRuntime.exec("myCalc paramC1 paramC2 paramC3");
Process calc4 = Runtime.getRuntime.exec("myCalc paramD1 paramD2 paramD3");

calc1.waitFor();
calc2.waitFor();
calc3.waitFor();
calc4.waitFor();

InputStream is1 = calc1.getInputStream();
InputStreamReader isr1 = new InputStreamReader(is1);
BufferedReader br1 = new BufferedReader(isr1);
String resultStr1 = br1.nextLine();

InputStream is2 = calc2.getInputStream();
InputStreamReader isr2 = new InputStreamReader(is2);
BufferedReader br2 = new BufferedReader(isr2);
String resultStr2 = br2.nextLine();

InputStream is3 = calc3.getInputStream();
InputStreamReader isr3 = new InputStreamReader(is3);
BufferedReader br3 = new BufferedReader(isr3);
String resultStr3 = br3.nextLine();

InputStream is4 = calc4.getInputStream();
InputStreamReader isr4 = new InputStreamReader(is4);
BufferedReader br4 = new BufferedReader(isr4);
String resultStr4 = br4.nextLine();
...