Удивительная точка безубыточности производительности ExecutorService - практические правила? - PullRequest
18 голосов
/ 30 октября 2009

Я пытаюсь понять, как правильно использовать Исполнители Java. Я понимаю, что отправка задач на ExecutorService имеет свои издержки. Тем не менее, я удивлен, увидев, что он такой же высокий, как и сейчас.

Моя программа должна обрабатывать огромное количество данных (данных фондового рынка) с минимально возможной задержкой. Большинство расчетов являются довольно простыми арифметическими операциями.

Я пытался проверить что-то очень простое: "Math.random() * Math.random()"

Самый простой тест запускает это вычисление в простом цикле. Второй тест выполняет те же вычисления внутри анонимного Runnable (он должен измерять стоимость создания новых объектов). Третий тест передает Runnable на ExecutorService (это измеряет стоимость представления исполнителей).

Я выполнил тесты на своем изящном ноутбуке (2 процессора, 1,5 ГБ оперативной памяти):

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422

(примерно один из четырех прогонов, первые два числа в конечном итоге равны)

Обратите внимание, что исполнители занимают гораздо больше времени, чем выполнение в одном потоке. Числа были примерно одинаковыми для размеров пула потоков от 1 до 8.

Вопрос: Я что-то упускаю из виду или эти результаты ожидаются? Эти результаты говорят мне, что любая задача, которую я передаю исполнителю, должна выполнять нетривиальные вычисления. Если я обрабатываю миллионы сообщений и мне нужно выполнять очень простые (и дешевые) преобразования для каждого сообщения, я все еще не могу использовать исполнителей ... попытка распределить вычисления по нескольким ЦП может оказаться более дорогой, чем просто делать их в один поток. Проектное решение становится намного сложнее, чем я думал. Есть мысли?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}

Ответы [ 9 ]

19 голосов
/ 30 октября 2009
  1. Использование executors - это использование ЦП и / или ядер ЦП, поэтому, если вы создаете пул потоков, который в лучшем случае использует количество ЦП, вы должны иметь столько потоков, сколько ЦП / ядер.
  2. Вы правы, создание новых объектов стоит слишком дорого. Таким образом, один из способов сократить расходы - использовать партии. Если вы знаете тип и количество вычислений, которые нужно сделать, вы создаете пакеты. Так что подумайте о тысячах вычислений, выполненных за одно выполненное задание. Вы создаете партии для каждого потока. Как только вычисления будут выполнены (java.util.concurrent.Future), вы создадите следующий пакет. Даже создание новых пакетов может быть выполнено в parralel (4 процессора -> 3 потока для вычислений, 1 поток для пакетной подготовки). В конце концов, вы можете получить большую пропускную способность, но с более высокими требованиями к памяти (пакеты, выделение ресурсов).

Редактировать: я изменил ваш пример и позволил ему работать на моем маленьком двухъядерном ноутбуке x200.

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

Как вы видите в исходном коде, я также вывел из измерения жизненный цикл пакетной инициализации и исполнителя. Это более справедливо по сравнению с двумя другими методами.

Смотрите результаты самостоятельно ...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}
7 голосов
/ 30 октября 2009

Это неправильный тест для пула потоков по следующим причинам,

  1. Вы вообще не пользуетесь пулом, потому что у вас есть только 1 поток.
  2. Задание слишком простое, поэтому затраты на объединение не могут быть оправданы. Умножение на CPU с FPP занимает всего несколько циклов.

Учитывая следующие дополнительные шаги, которые должен выполнять пул потоков помимо создания объекта и запуска задания,

  1. Поместить задание в очередь
  2. Удалить задание из очереди
  3. Получить поток из пула и выполнить задание
  4. Вернуть нить в пул

Когда у вас есть реальная работа и несколько потоков, выгода от использования пула потоков будет очевидна.

4 голосов
/ 19 ноября 2014

Упомянутые вами «издержки» не имеют ничего общего с ExecutorService, они вызваны синхронизацией нескольких потоков в Math.random, что создает конфликт блокировки.

Так что да, вы что-то упустили (и «правильный» ответ ниже на самом деле не верен).

Вот некоторый код Java 8, демонстрирующий 8 потоков, выполняющих простую функцию, в которой нет конфликта блокировок:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;

import com.google.common.base.Stopwatch;

public class ExecServicePerformance {

    private static final int repetitions = 120;
    private static int totalOperations = 250000;
    private static final int cpus = 8;
    private static final List<Batch> batches = batches(cpus);

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };

    public static void main( String[] args ) throws InterruptedException {

        printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
        printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
        printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
        printExecutionTime("Executor pool", ExecServicePerformance::executorPool);

    }

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
        long time = 0;
        for (int i = 0; i < repetitions; i++) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
            time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }
        System.out.println(msg + " exec time: " + time);
    }    

    private static void synchronous() {
        for ( int i = 0; i < totalOperations; i++ ) {
            performanceFunc.apply(i);
        }
    }

    private static void synchronousBatches() {      
        for ( Batch batch : batches) {
            batch.synchronously();
        }
    }

    private static void asynchronousBatches() {

        CountDownLatch cb = new CountDownLatch(cpus);

        for ( Batch batch : batches) {
            Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
            Thread t = new Thread(r);
            t.start();
        }

        try {
            cb.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }        
    }

    private static void executorPool() {

        final ExecutorService es = Executors.newFixedThreadPool(cpus);

        for ( Batch batch : batches ) {
            Runnable r = () ->  { batch.synchronously(); };
            es.submit(r);
        }

        es.shutdown();

        try {
            es.awaitTermination( 10, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 

    }

    private static List<Batch> batches(final int cpus) {
        List<Batch> list = new ArrayList<Batch>();
        for ( int i = 0; i < cpus; i++ ) {
            list.add( new Batch( totalOperations / cpus ) );
        }
        System.out.println("Batches: " + list.size());
        return list;
    }

    private static class Batch {

        private final int operationsInBatch;

        public Batch( final int ops ) {
            this.operationsInBatch = ops;
        }

        public void synchronously() {
            for ( int i = 0; i < operationsInBatch; i++ ) {
                performanceFunc.apply(i);
            }
        }
    }


}

Синхронизация результатов для 120 тестов по 25 тыс. Операций (мс):

  • Синхронное время выполнения: 9956
  • Время выполнения синхронных партий: 9900
  • Поток за время выполнения партии: 2176
  • Время исполнения пула исполнителей: 1922

Победитель: Служба Исполнителя.

4 голосов
/ 25 июля 2012

Math.random () фактически синхронизируется на одном генераторе случайных чисел. Вызов Math.random () приводит к значительному конфликту для генератора чисел. На самом деле, чем больше у вас потоков, тем медленнее будет.

Из Math.random () javadoc:

Этот метод правильно синхронизирован, чтобы обеспечить более одна нить. Однако, если много потоков нужно генерировать псевдослучайный номера с большой скоростью, это может уменьшить конкуренцию для каждого потока имеет собственный генератор псевдослучайных чисел.

4 голосов
/ 30 октября 2009

Я не думаю, что это вообще реально, так как вы создаете новую службу executor каждый раз, когда делаете вызов метода. Если у вас нет очень странных требований, которые кажутся нереальными - обычно вы создаете сервис при запуске приложения, а затем отправляете ему задания.

Если вы снова попробуете сравнительный анализ, но инициализируете службу как поле , один раз, вне цикла синхронизации; тогда вы увидите фактические накладные расходы, связанные с отправкой Runnables в службу и выполнением их самостоятельно.

Но я не думаю, что вы полностью поняли суть - исполнители не предназначены для повышения эффективности, они там для того, чтобы упростить координацию и передачу работы пулу потоков. Они всегда будут менее эффективными, чем просто вызов Runnable.run() самостоятельно (поскольку в конце дня служба исполнителя все еще должна сделать это, предварительно выполнив некоторые дополнительные действия по ведению домашнего хозяйства). Когда вы используете их из нескольких потоков, нуждающихся в асинхронной обработке, они действительно сияют.

Также учтите, что вы смотрите на относительную разницу во времени по существу с фиксированными затратами (накладные расходы исполнителя равны, независимо от того, выполняются ли ваши задачи 1 мс или 1 час) по сравнению с очень маленькой переменной величиной (ваш тривиальный пробег) Если для выполнения задачи в 1 мс службе исполнителя требуется 5 мс, это не очень благоприятный показатель. Если для выполнения 5-секундной задачи требуется дополнительно 5 мс (например, нетривиальный SQL-запрос), это совершенно незначительно и стоит того.

Так что, в некоторой степени, это зависит от вашей ситуации - если у вас есть чрезвычайно критичный ко времени раздел, выполняющий множество небольших задач, которые не нужно выполнять параллельно или асинхронно, тогда вы ничего не получите от исполнителя , Если вы обрабатываете более тяжелые задачи параллельно и хотите отвечать асинхронно (например, через веб-приложение), тогда исполнители - это здорово.

То, являются ли они лучшим выбором для вас, зависит от вашей ситуации, но на самом деле вам нужно попробовать тесты с реалистичными репрезентативными данными. Я не думаю, что было бы уместно делать какие-либо выводы из проведенных вами тестов, если только ваши задачи не являются настолько тривиальными (и вы не хотите повторно использовать экземпляр executor ...).

2 голосов
/ 21 августа 2016

Вот результаты на моей машине (OpenJDK 8 на 64-битной Ubuntu 14.0, Thinkpad W530)

simpleCompuation:6
computationWithObjCreation:5
computationWithObjCreationAndExecutors:33

Там наверняка накладные расходы. Но помните, что это за числа: миллисекунды для 100 000 итераций . В вашем случае издержки были около 4 микросекунд на итерацию. Для меня накладные расходы составили около четверти микросекунды.

Издержки - это синхронизация, внутренние структуры данных и, возможно, отсутствие оптимизации JIT из-за сложных путей кода (безусловно, более сложных, чем ваш цикл for).

Задачи, которые вы на самом деле хотели бы распараллелить, стоили бы того, несмотря на накладные расходы в четверть микросекунды.


К вашему сведению, это было бы очень плохим вычислением для распараллеливания. Я увеличил поток до 8 (количество ядер):

simpleCompuation:5
computationWithObjCreation:6
computationWithObjCreationAndExecutors:38

Это не сделало это быстрее. Это потому, что Math.random() синхронизирован.

0 голосов
/ 08 июня 2014

Вам нужно как-то сгруппировать выполнение, чтобы передавать большие части вычислений каждому потоку (например, строить группы на основе символа акции). Я получил лучшие результаты в подобных сценариях, используя Disruptor. У него очень низкие накладные расходы. По-прежнему важно группировать задания, но наивный циклический перебор обычно создает много ошибок в кеше.

см. http://java -is-the-new-c.blogspot.de / 2014/01 / сравнение различных параллелей.html

0 голосов
/ 11 февраля 2011

Конечной целью Fixed ThreadPool является повторное использование уже созданных потоков. Таким образом, прирост производительности проявляется в отсутствии необходимости воссоздавать новый поток каждый раз при отправке задачи. Следовательно, время остановки должно быть принято внутри представленного задания. Просто в последнем утверждении метода run.

0 голосов
/ 30 октября 2009

Во-первых, есть несколько проблем с микробенчмарком. Вы делаете разминку, и это хорошо. Однако лучше выполнить тест несколько раз, что должно дать представление о том, действительно ли он прогрелся, и об отклонениях результатов. Также лучше проводить тестирование каждого алгоритма в отдельных прогонах, иначе вы можете вызвать деоптимизацию при изменении алгоритма.

Задача очень маленькая, хотя я не совсем уверен, насколько она мала. Так что число раз быстрее это довольно бессмысленно. В многопоточных ситуациях он будет касаться одних и тех же изменчивых мест, поэтому потоки могут привести к очень плохой производительности (используйте экземпляр Random на поток). Также пробежка в 47 миллисекунд немного короткая.

Конечно, переход к другому потоку для крошечной операции не будет быстрым. Разделите задачи на большие размеры, если это возможно. JDK7 выглядит так, как будто у него будет инфраструктура fork-join, которая пытается поддерживать тонкие задачи из алгоритмов «разделяй и властвуй», предпочитая выполнять задачи в одном и том же потоке по порядку, причем более крупные задачи извлекаются простыми потоками.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...