Как реализовать простую многопоточность с фиксированным количеством рабочих потоков - PullRequest
49 голосов
/ 24 сентября 2008

Я ищу самый простой и простой способ реализовать следующее:

  • Основная программа создает экземпляр работника темы для выполнения задачи.
  • Только n задачи могут быть запущены одновременно.
  • Когда достигнуто n, рабочих больше нет запускаются до подсчета текущие потоки возвращаются ниже n.

Ответы [ 7 ]

53 голосов
/ 24 сентября 2008

Я думаю, что Executors.newFixedThreadPool соответствует вашим требованиям. Существует несколько различных способов использования получающегося в результате ExecutorService, в зависимости от того, хотите ли вы вернуть результат в основной поток, или является ли задача полностью автономной, и есть ли у вас набор задач, которые нужно выполнить заранее, или ставятся ли задачи в очередь в ответ на какое-либо событие.

  Collection<YourTask> tasks = new ArrayList<YourTask>();
  YourTask yt1 = new YourTask();
  ...
  tasks.add(yt1);
  ...
  ExecutorService exec = Executors.newFixedThreadPool(5);
  List<Future<YourResultType>> results = exec.invokeAll(tasks);

В качестве альтернативы, если у вас есть новая асинхронная задача для выполнения в ответ на какое-либо событие, вы, вероятно, просто хотите использовать простой метод execute(Runnable) в ExecutorService.

23 голосов
/ 24 сентября 2008
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
    /* ...execute the task to run concurrently as a runnable: */
    exec.execute(new Runnable() {
        public void run() {
            /* do the work to be done in its own thread */
            System.out.println("Running in: " + Thread.currentThread());
        }
    });
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
    /* The tasks are now running concurrently. We wait until all work is done, 
     * with a timeout of 50 seconds: */
    boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
    /* If the execution timed out, false is returned: */
    System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }
6 голосов
/ 24 сентября 2008

Executors.newFixedThreadPool (INT)

Executor executor = Executors.newFixedThreadPool(n);

Runnable runnable = new Runnable() {
 public void run() {
  // do your thing here
 }
}

executor.execute(runnable);
2 голосов
/ 24 сентября 2008

Использование фреймворка Executor; а именно newFixedThreadPool (N)

1 голос
/ 31 января 2016
  1. Если ваша очередь задач не будет неограниченной и задачи могут выполняться за более короткие промежутки времени, вы можете использовать Executors.newFixedThreadPool(n); как подсказывают эксперты.

    Единственным недостатком этого решения является неограниченный размер очереди задач. Вы не можете это контролировать. Огромное накопление в очереди задач ухудшит производительность приложения и может вызвать нехватку памяти в некоторых сценариях.

  2. Если вы хотите использовать ExecutorService и включить механизм work stealing, где свободные рабочие потоки распределяют рабочую нагрузку из занятых рабочих потоков путем кражи задач в очереди задач , Он будет возвращать тип ForkJoinPool службы Executor.

    публичная статика ExecutorService newWorkStealingPool (внутри параллелизма)

    Создает пул потоков, который поддерживает достаточно потоков для поддержки заданного уровня параллелизма, и может использовать несколько очередей для уменьшения конкуренции. Уровень параллелизма соответствует максимальному количеству потоков, активно участвующих или доступных для обработки задач. Фактическое количество потоков может расти и уменьшаться динамически. Пул кражи работ не дает никаких гарантий относительно порядка выполнения представленных задач.

  3. Я предпочитаю ThreadPoolExecutor из-за гибкости в API для управления многими параметрами, которые контролируют выполнение задачи потока.

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    

в вашем случае установите оба corePoolSize and maximumPoolSize as N. Здесь вы можете контролировать размер очереди задач, определять собственную фабрику потоков и политику обработки отклонений.

Посмотрите на связанный вопрос SE для динамического управления размером пула:

Динамический пул потоков

0 голосов
/ 24 сентября 2008

Как уже упоминали другие, лучше всего создать пул потоков с классом Executors :

Однако, если вы хотите свернуть свой собственный, этот код должен дать вам представление о том, как действовать дальше. По сути, просто добавьте каждый новый поток в группу потоков и убедитесь, что в группе никогда не бывает больше N активных потоков:

Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
    if( group.activeCount()<N && i<tasks.length ) {
        new TaskThread(group, tasks[i]).start();
        i++;
    } else {
        Thread.sleep(100);
    }
}
0 голосов
/ 24 сентября 2008

Если вы хотите бросить свой собственный:

private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);

private boolean roomLeft() {
    synchronized (workers) {
        return (workers.size() < MAX_WORKERS);
    }
}

private void addWorker() {
    synchronized (workers) {
        workers.add(new Worker(this));
    }
}

public void removeWorker(Worker worker) {
    synchronized (workers) {
        workers.remove(worker);
    }
}

public Example() {
    while (true) {
        if (roomLeft()) {
            addWorker();
        } 
    }
}

Где Worker - ваш класс, расширяющий Thread. Каждый работник будет вызывать метод removeWorker этого класса, передавая себя в качестве параметра, когда он завершит свое дело.

С учетом сказанного, среда Executor выглядит намного лучше.

Редактировать: Кто-нибудь хочет объяснить, почему это так плохо, вместо того, чтобы просто уменьшить его?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...