Вызов ExecutorService.shutdown в динамическом номере потока - PullRequest
3 голосов
/ 21 марта 2012

Доброе утро в моем часовом поясе.

Я использую пул потоков для разработки маленького робота Http, который перемещается от ссылки к ссылке на каждой странице. Когда я нашел новую ссылку, я создаю новую тему, которая будет исследовать эту новую страницу. Псевдокод.

pool = Executors.newFixedThreadPool(40);

pool.execute(new Exploit(tree.getRoot()));

В этом случае Exploit является внутренним классом, который реализует интерфейс Runnable и имеет доступ к пулу, поэтому каждый раз, когда один поток находит ссылку, будет использовать пул для добавления нового «потока» следующим образом:

for(Link n : links){
   pool.execute(new Exploit(n));
 }

Я видел много примеров использования класса ExecutorService, но все они используют один и тот же код, подобный следующему:

ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
    Runnable worker = new MyRunnable(10000000L + i);
    executor.execute(worker);
}
   // This will make the executor accept no new threads
   // and finish all existing threads in the queue
    executor.shutdown();

В приведенном выше коде число потоков является статическим, поэтому, когда код вызывает завершение работы, все потоки уже были добавлены в пул. Я не могу следовать этому коду, потому что в моем случае у меня нет статического числа потоков добавить. Мое условие остановки для добавления большего количества потоков в пул - когда я достиг глубокого уровня поиска. Итак, мой вопрос, как мне вызвать executor.shutdown в основном потоке? Есть ли какое-либо соединение, которое я могу использовать в главном потоке?

Спасибо заранее. С наилучшими пожеланиями

Ответы [ 4 ]

2 голосов
/ 21 марта 2012

Вы можете взглянуть на Phaser . Вы по-прежнему можете использовать фиксированное количество потоков, но каждый раз, когда вы найдете ссылку, вы можете зарегистрировать другую сторону и отправить работоспособный объект на основе этой ссылки.

Phaser phaser = new Phaser(1);
ExecutorService e = Executors.newFixedThreadPool(n);

public void crawl(final String url){
   visit(url);
   phaser.arriveAndAwaitAdvance();
   e.shutdown();  
}

private void visit(String url){
    phaser.register();
    e.submit(new Runnable(){
        public void run(){
            //visit link maybe another visit(url)             
            phaser.arrive();
        } 
    });
}

На этом этапе e.shutdown () никогда не произойдет, пока не будут просмотрены все ссылки.

1 голос
/ 21 марта 2012

Вам необходимо отслеживать, сколько задач в данный момент находится в пуле.Увеличивайте счетчик перед каждым вызовом execute ().Затем уменьшайте счетчик в конце каждой задачи, убедитесь, что вы делаете это, даже если есть исключение.

Тогда код, который отключит исполнителя (один отправляет первое задание), должен подождать некоторое времяЦикл, чтобы увидеть, равен ли счетчик 0.

В убывающем коде должен использоваться notify для пробуждения основного потока.

class TaskCounter {
   private final Object lock = new Object();
   private long count;

   public void taskStart() {
      synchronize (lock) {
         count++;
      }
   }

   public void taskEnd() {
      synchronize (lock) {
         count--;
         if (count == 0) {
            lock.notify();
         }
      }
   }

   public void waitForAllTasksToComplete() throws InterruptedException {
      synchronize (lock) {
         while (count != 0) {
            lock.wait();
         }
      }
   }
}
0 голосов
/ 21 марта 2012

newFixedThreadPool будет только устанавливать количество потоков, выполняемых одновременно.Здесь не указано количество потоков, которые вы можете поместить в службу executor.Таким образом, вы можете добавить столько потоков, сколько захотите, в свой основной поток, запустить execute() и shutdown() ExecutorService, если считаете, что больше не добавите

0 голосов
/ 21 марта 2012

В показанном вами коде у вас do статическое число потоков. newFixedThreadPool создает пул потоков с фиксированным числом потоков.

Когда вы звоните pool.execute, вы не создаете новую тему. Вы создаете новую задачу, которая будет выполняться одним из существующих потоков. В этом весь смысл пула потоков.

...