Как управлять M потоками (по 1 на задачу), обеспечивая одновременно только N потоков. С Н <М. На Яве - PullRequest
5 голосов
/ 12 сентября 2009

У меня есть очередь задач в Java. Эта очередь находится в таблице в БД.

Мне нужно:

  • 1 поток только для задачи
  • Одновременно работает не более N потоков. Это связано с тем, что потоки взаимодействуют с БД, и я не хочу, чтобы открывалась куча соединений с БД.

Я думаю, я мог бы сделать что-то вроде:

final Semaphore semaphore = new Semaphore(N);
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        final CountDownLatch cdl = new CountDownLatch(tasks.size());
        for (final JobTask task : tasks) {
            Thread tr = new Thread(new Runnable() {

                @Override
                public void run() {
                    semaphore.acquire();
                    task.doWork();
                    semaphore.release();
                    cdl.countDown();
                }

            });
        }
        cdl.await();
    }
}

Я знаю, что класс ExecutorService существует, но я не уверен, смогу ли я использовать его для этого.

Итак, вы думаете, что это лучший способ сделать это? Или не могли бы вы разъяснить мне, как работает ExecutorService, чтобы решить эту проблему?

окончательное решение:

Я думаю, что лучшим решением будет что-то вроде:

while (isOnJob) {
    ExecutorService executor = Executors.newFixedThreadPool(N);
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        for (final JobTask task : tasks) {
            executor.submit(new Runnable() {

                @Override
                public void run() {
                    task.doWork();
                }

            });
        }
    }
    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
}

Большое спасибо за это. Кстати, я использую пул соединений, но запросы к БД очень тяжелые, и я не хочу иметь неконтролируемое количество задач одновременно.

Ответы [ 4 ]

7 голосов
/ 12 сентября 2009

Вы действительно можете использовать ExecutorService. Например, создайте новый пул фиксированных потоков, используя метод newFixedThreadPool. Таким образом, кроме кеширования потоков, вы также гарантируете, что одновременно выполняется не более n потоков.

Что-то вроде этого:

private static final ExecutorService executor = Executors.newFixedThreadPool(N);
// ...
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        List<Future<?>> futures = new ArrayList<Future<?>>();
        for (final JobTask task : tasks) {
                Future<?> future = executor.submit(new Runnable() {    
                        @Override
                        public void run() {
                                task.doWork();
                        }
                });
                futures.add(future);
        }
        // you no longer need to use await
        for (Future<?> fut : futures) {
          fut.get();
        }
    }
}

Обратите внимание, что вам больше не нужно использовать защелку, поскольку get будет ждать завершения вычислений, если это необходимо.

4 голосов
/ 12 сентября 2009

Я согласен с JG в том, что ExecutorService - это путь ... но я думаю, что вы оба усложняете ситуацию.

Вместо того, чтобы создавать большое количество потоков (по 1 на задачу), почему бы просто не создать пул потоков фиксированного размера (с Executors.newFixedThreadPool(N)) и отправить в него все задачи? Нет необходимости в семафоре или чем-то подобном - просто отправляйте задания в пул потоков по мере их получения, и пул потоков будет обрабатывать их с до N потоков одновременно.

Если вы не собираетесь использовать более N потоков одновременно, зачем вам их создавать?

1 голос
/ 12 сентября 2009

Используйте экземпляр ThreadPoolExecutor с несвязанной очередью и фиксированным максимальным размером потоков, например, Executors.newFixedThreadPool (N) * +1004 *. Это примет большое количество задач, но только одновременно выполнит N .

Если вместо этого вы выберете ограниченную очередь (с емкостью N ), Executor отклонит выполнение задачи (насколько точно зависит от политики, которую вы можете настроить при работе с ThreadPoolExecutor напрямую, вместо использования фабрики Executors - см. RejectedExecutionHandler ).

Если вам нужен «реальный» контроль перегруженности, вам следует установить ограничение BlockingQueue вместимостью N . Извлеките задачи, которые вы хотите выполнить, из базы данных и поместите их в очередь - если она заполнится, вызывающий поток заблокируется. В другом потоке (возможно, также началось использование Executor API) вы берете задач из BlockingQueue и отправляете их Executor . Если BlockingQueue пусто, вызывающий поток также будет блокироваться. Чтобы указать, что вы закончили, используйте «специальный» объект (например, синглтон, который отмечает последний / последний элемент в очереди).

0 голосов
/ 12 сентября 2009

Достижение хорошей производительности также зависит от вида работы, которую необходимо выполнить в потоках. Если ваша БД является узким местом в обработке, я бы начал обращать внимание на то, как ваши потоки обращаются к БД. Использование пула соединений возможно в порядке. Это может помочь вам повысить пропускную способность, поскольку рабочие потоки могут повторно использовать соединения с БД из пула.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...