Как лениво создавать задачи для использования пулом потоков Java - PullRequest
3 голосов
/ 07 марта 2012

Я пишу приложение для нагрузочного тестирования на Java и имею пул потоков, который выполняет задачи на тестируемом сервере.Таким образом, чтобы создать 1000 заданий и запустить их в 5 потоках, я делаю что-то вроде этого:

    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<Runnable> jobs = makeJobs(1000);
    for(Runnable job : jobs){
        pool.execute(job);
    }

Однако я не думаю, что этот подход будет очень хорошо масштабироваться, потому что мне нужно создать все объекты 'job'заранее и пусть они сидят в памяти, пока они не понадобятся.

Я ищу способ, чтобы потоки в пуле переходили к какому-то классу JobFactory каждый раз, когда им нужно новое задание, и чтобы фабрика собирала Runnables по запросу до требуемого числаработ были запущены.Фабрика может начать возвращать 'null', чтобы сигнализировать потокам, что больше нет работы.

Я мог бы написать что-то подобное вручную, но это кажется достаточно распространенным вариантом использования иИнтересно, есть ли что-нибудь в замечательном, но сложном пакете 'java.util.concurrent', который я мог бы использовать вместо этого?

Ответы [ 2 ]

5 голосов
/ 07 марта 2012

Вы можете выполнять всю работу в исполняющих потоках пулов потоков, используя AtomicInteger для контроля количества выполненных исполняемых объектов

 int numberOfParties = 5;
 AtomicInteger numberOfJobsToExecute = new AtomicInteger(1000);
 ExecutorService pool = Executors.newFixedThreadPool(numberOfParties );
 for(int i =0; i < numberOfParties; i++){
     pool.submit(new Runnable(){
        public void run(){
            while(numberOfJobsToExecute.decrementAndGet() >= 0){
                makeJobs(1).get(0).run();
            }
        }
     });
 }

Вы также можете сохранять возвращенные Future в List и get()на них ждать завершения (среди прочих механизмов)

4 голосов
/ 07 марта 2012

Хмм. Вы можете создать BlockingQueue<Runnable> с фиксированной емкостью, и каждый из ваших рабочих потоков выведет из очереди Runnable и запустит его. Тогда у вас может быть поток производителя, который помещает задания в очередь.

Основной поток будет делать что-то вроде:

// 100 is the capacity of the queue before blocking
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(100);
// start the submitter thread
new Thread(new JobSubmitterThread(queue)).start();
// make in a loop or something?
new Thread(new WorkerThread(queue)).start();
new Thread(new WorkerThread(queue)).start();
...

Рабочий будет выглядеть примерно так:

public class WorkerThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         // run until the main thread shuts it down using volatile boolean or ...
         while (!shutdown) {
             Runnable job = queue.take();
             job.run();
         }
     }
}

И соискатель будет выглядеть примерно так:

 public class JobSubmitterThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         for (int jobC = 0; jobC < 1000; jobC++) {
             Runnable job = makeJob();
             // this would block when the queue reaches capacity
             queue.put(job);
         }
     }
 }
...