Разница между многопоточностью с и без исполнителя - PullRequest
2 голосов
/ 07 марта 2012

Я пытаюсь выяснить разницу в производительности между обычной многопоточностью и многопоточностью с помощью executor (для поддержки пула потоков).

Ниже приведены примеры кода для обоих.

Без кода исполнителя (с многопоточностью):

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.ThreadMXBean;
import java.util.List;

public class Demo1 {
public static void main(String arg[]) {
    Demo1 demo = new Demo1();
    Thread t5  = new Thread(new Runnable() {
       public void run() {
              int count=0;
              // Thread.State;
              // System.out.println("ClientMsgReceiver started-----");
              Demo1.ChildDemo  obj = new Demo1.ChildDemo();
              while(true) {

                // System.out.println("Threadcount is"+Thread);
                // System.out.println("count is"+(count++));
                Thread t=new Thread(obj);
                t.start();
                ThreadMXBean tb = ManagementFactory.getThreadMXBean();
                List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
                for (MemoryPoolMXBean pool : pools) {
                   MemoryUsage peak = pool.getPeakUsage();
                   System.out.format("Peak %s memory used: %,d%n",
                             pool.getName(), peak.getUsed());
                   System.out.format("Peak %s memory reserved: %,d%n",
                             pool.getName(), peak.getCommitted());
                } 

                System.out.println("Current Thread Count"+ tb.getThreadCount());
                System.out.println("Peak Thread Count"+ tb.getPeakThreadCount());
                System.out.println("Current_Thread_Cpu_Time " 
                         + tb.getCurrentThreadCpuTime());
                System.out.println("Daemon Thread Count" +tb.getDaemonThreadCount());
       }
       // ChatLogin = new ChatLogin();
     }
  });
  t5.start();
}

static class ChildDemo implements Runnable {
   public void run() {
        try {
        //  System.out.println("Thread Started with custom Run method");
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally {
            System.out.println("A" +Thread.activeCount());
        }
    }
  }
}

С исполнителем (многопоточность):

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.lang.management.ThreadMXBean;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Executor_Demo {
public static void main(String arg[]) {
   BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);
   ThreadPoolExecutor executor   = new ThreadPoolExecutor(
          10, 100, 10, TimeUnit.MICROSECONDS, queue);
   Executor_Demo demo = new Executor_Demo();

   executor.execute(new Runnable() {
       public void run() {
          int count=0;
          // System.out.println("ClientMsgReceiver started-----");
          Executor_Demo.Demo demo2 = new Executor_Demo.Demo();
          BlockingQueue<Runnable> queue1 = new ArrayBlockingQueue<Runnable>(1000);
          ThreadPoolExecutor executor1   = new ThreadPoolExecutor(
                  1000, 10000, 10, TimeUnit.MICROSECONDS, queue1);

          while(true) {
             // System.out.println("Threadcount is"+Thread);
             // System.out.println("count is"+(count++));
             Runnable command= new Demo();
             // executor1.execute(command);
             executor1.submit(command);         
             // Thread t=new Thread(demo2);
             // t.start();
             ThreadMXBean tb = ManagementFactory.getThreadMXBean();
             /* try {
                  executor1.awaitTermination(100, TimeUnit.MICROSECONDS);
                } catch (InterruptedException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
                } */
              List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
              for (MemoryPoolMXBean pool : pools) {
                 MemoryUsage peak = pool.getPeakUsage();
                 System.out.format("Peak %s memory used: %,d%n",
                          pool.getName(), peak.getUsed());
                 System.out.format("Peak %s memory reserved: %,d%n",
                          pool.getName(), peak.getCommitted());
          }
              System.out.println("daemon threads"+tb.getDaemonThreadCount());
              System.out.println("All threads"+tb.getAllThreadIds());
              System.out.println("current thread CPU time "
                       + tb.getCurrentThreadCpuTime());
              System.out.println("current thread user time "
                       + tb.getCurrentThreadUserTime());
              System.out.println("Total started thread count " 
                       + tb.getTotalStartedThreadCount());
              System.out.println("Current Thread Count"+ tb.getThreadCount());
              System.out.println("Peak Thread Count"+ tb.getPeakThreadCount());
              System.out.println("Current_Thread_Cpu_Time " 
                       + tb.getCurrentThreadCpuTime());
              System.out.println("Daemon Thread Count"
                       + tb.getDaemonThreadCount());
              // executor1.shutdown();  
            }
            //ChatLogin = new ChatLogin();
          }
     });
     executor.shutdown();
}

static class Demo implements Runnable {
    public void run() {
      try {
        // System.out.println("Thread Started with custom Run method");
        Thread.sleep(100000);
      } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
      }
      finally {
         System.out.println("A" +Thread.activeCount());
      }
   }
  }
}

Пример вывода Output

Когда я запускаю обе программы, оказывается, что исполнитель обходится дороже, чем обычная многопоточность.почему это так?

И, учитывая это, какая польза от исполнителя?Мы используем executor для управления пулами потоков.

Я бы ожидал, что executor даст лучшие результаты, чем обычная многопоточность.

В основном я делаю это, поскольку мне нужно обрабатывать миллионы клиентов, используяпрограммирование сокетов с многопоточностью.

Любые предложения будут полезны.

Ответы [ 2 ]

2 голосов
/ 07 марта 2012

Каждый поток потребляет память для стека, что-то от 256K до 1M.Вы можете установить размер стека вручную, но опасно устанавливать его ниже 128K.Так что если у вас 2G памяти и вы можете позволить себе потратить 1/2 на потоки, у вас будет не более 8K потоков.Если это нормально, используйте обычную многопоточность (каждый Runnable имеет свой собственный стек).Если вы не хотите или не можете тратить столько памяти на каждый Runnable, используйте Executor.Установите размер пула потоков равным числу процессоров (Runtime.availableProcessors ()) или в несколько раз больше.Основная проблема заключается в том, что вы не можете сделать Thread.sleep () или иным образом заблокировать поток в вашем работающем (скажем, ожидать ответа пользователя), потому что такая блокировка фактически исключает поток из обслуживания.В результате, если вы используете пул потоков ограниченного размера, возникает так называемое «истощение потоков», которое фактически является тупиком.Если ваш пул потоков имеет неограниченный размер, то вы возвращаетесь к нормальной многопоточности и вскоре исчерпываете память.

Лечение состоит в том, чтобы использовать асинхронные операции, то есть настроить некоторый запрос с помощью обратного вызова и выйти изметод run ().Затем обратный вызов должен начать выполнение некоторого объекта Runnable (может быть, того же самого) с Executor.execute (Runnable), или он может выполнить сам метод runnable.run ().

В настоящее время присутствуют асинхронные операции ввода-выводав Java 7 (nio2), но мне не удалось заставить его обслуживать более нескольких сотен сетевых подключений.Для обслуживания сетевых подключений могут использоваться асинхронные сетевые библиотеки (например, Apache Netty).

Организация обратных вызовов и выполнение исполняемых модулей может потребовать сложной синхронизации.Чтобы упростить жизнь, рассмотрите возможность использования модели Actor (http://en.wikipedia.org/wiki/Actor_model),, где Actor - Runnable, выполняемый каждый раз, когда поступает входное сообщение. Существует множество библиотек Java Actor (например, https://github.com/rfqu/df4j).

2 голосов
/ 07 марта 2012

Чтобы увидеть, как что-то масштабируется, я бы постарался свести к минимуму стоимость мониторинга и сравнить небольшое число с большим числом.

public class Executor_Demo {
    public static void main(String... arg) throws ExecutionException, InterruptedException {
        int nThreads = 5100;
        ExecutorService executor = Executors.newFixedThreadPool(nThreads, new DaemonThreadFactory());

        List<Future<Results>> futures = new ArrayList<Future<Results>>();
        for (int i = 0; i < nThreads; i++) {
            futures.add(executor.submit(new BackgroundCallable()));
        }
        Results result = new Results();
        for (Future<Results> future : futures) {
            result.merge(future.get());
        }
        executor.shutdown();

        result.print(System.out);

    }

    static class Results {
        private long cpuTime;
        private long userTime;

        Results() {
            final ThreadMXBean tb = ManagementFactory.getThreadMXBean();
            cpuTime = tb.getCurrentThreadCpuTime();
            userTime = tb.getCurrentThreadUserTime();
        }


        public void merge(Results results) {
            cpuTime += results.cpuTime;
            userTime += results.userTime;
        }

        public void print(PrintStream out) {
            ThreadMXBean tb = ManagementFactory.getThreadMXBean();

            List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
            for (int i = 0, poolsSize = pools.size(); i < poolsSize; i++) {
                MemoryPoolMXBean pool = pools.get(i);
                MemoryUsage peak = pool.getPeakUsage();
                out.format("Peak %s memory used:\t%,d%n", pool.getName(), peak.getUsed());
                out.format("Peak %s memory reserved:\t%,d%n", pool.getName(), peak.getCommitted());
            }

            out.println("Total thread CPU time\t" + cpuTime);
            out.println("Total thread user time\t" + userTime);
            out.println("Total started thread count\t" + tb.getTotalStartedThreadCount());
            out.println("Current Thread Count\t" + tb.getThreadCount());
            out.println("Peak Thread Count\t" + tb.getPeakThreadCount());
            out.println("Daemon Thread Count\t" + tb.getDaemonThreadCount());
        }
    }

    static class DaemonThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    }

    static class BackgroundCallable implements Callable<Results> {
        @Override
        public Results call() throws Exception {
            Thread.sleep(100);
            return new Results();
        }
    }
}

при тестировании с -XX:MaxNewSize=64m (этоограничивает размер временных областей памяти, будет увеличиваться)

100 threads
Peak Code Cache memory used:    386,880
Peak Code Cache memory reserved:    2,555,904
Peak PS Eden Space memory used: 41,280,984
Peak PS Eden Space memory reserved: 50,331,648
Peak PS Survivor Space memory used: 0
Peak PS Survivor Space memory reserved: 8,388,608
Peak PS Old Gen memory used:    0
Peak PS Old Gen memory reserved:    192,675,840
Peak PS Perm Gen memory used:   3,719,616
Peak PS Perm Gen memory reserved:   21,757,952
Total thread CPU time   20000000
Total thread user time  20000000
Total started thread count  105
Current Thread Count    93
Peak Thread Count   105
Daemon Thread Count 92

5100 threads
Peak Code Cache memory used:    425,728
Peak Code Cache memory reserved:    2,555,904
Peak PS Eden Space memory used: 59,244,544
Peak PS Eden Space memory reserved: 59,244,544
Peak PS Survivor Space memory used: 2,949,152
Peak PS Survivor Space memory reserved: 8,388,608
Peak PS Old Gen memory used:    3,076,400
Peak PS Old Gen memory reserved:    192,675,840
Peak PS Perm Gen memory used:   3,787,096
Peak PS Perm Gen memory reserved:   21,757,952
Total thread CPU time   810000000
Total thread user time  150000000
Total started thread count  5105
Current Thread Count    5105
Peak Thread Count   5105
Daemon Thread Count 5104

Основное увеличение - увеличение используемого старого поколения ~ 3 МБ или около 6 КБ на поток.и процессор используется 956 мс или около 0,2 мс на поток.


В первом примере вы создаете один поток, а во втором - 1000.

ВыходВы выполняете, кажется, большую часть работы, и у вас гораздо больше выходных данных во втором случае, чем в первом.

Вы должны быть уверены, что ваши тестирование и мониторинг гораздо легче, чем вы хотите, чтобы вы пыталисьмонитор / меры.

...