Как заставить основной поток ждать завершения других потоков в ThreadPoolExecutor - PullRequest
2 голосов
/ 15 декабря 2009

Я использую ThreadPoolExecutor для реализации потоков в моем приложении Java.

У меня есть XML, который мне нужно проанализировать и добавить каждый его узел в поток, чтобы выполнить завершение. Моя реализация такова:

parse_tp - созданный объект пула потоков, а ParseQuotesXML - класс с методом run.

        try {     
           List children = root.getChildren();               
        Iterator iter = children.iterator();

        //Parsing the XML     
        while(iter.hasNext()) {       
           Element child = (Element) iter.next();           
           ParseQuotesXML quote = new ParseQuotesXML(child, this);         
           parse_tp.execute(quote);         
        }
    System.out.println("Print it after all the threads have completed");
        catch(Exception ex) {  
        ex.printStackTrace();      
        }
        finally {  
    System.out.println("Print it in the end.");
if(!parse_tp.isShutdown()) {
                if(parse_tp.getActiveCount() == 0 && parse_tp.getQueue().size() == 0 ) {
                    parse_tp.shutdown();                    
                } else {
                    try {
                        parse_tp.awaitTermination(30, TimeUnit.SECONDS);
                    } catch (InterruptedException ex) {
                        log.info("Exception while terminating the threadpool "+ex.getMessage());
                        ex.printStackTrace();
                    }
                }
            }
          parse_tp.shutdown();  
        }

Проблема в том, что два оператора вывода на печать печатаются до выхода из других потоков. Я хочу, чтобы основной поток ожидал завершения всех остальных потоков. В обычной реализации Thread я могу сделать это, используя функцию join (), но не получая способа добиться того же в ThreadPool Executor. Также хотел бы спросить, если код, написанный в блоке finally, закрыть собственно пул потоков?

Спасибо, Amit

Ответы [ 3 ]

4 голосов
/ 15 декабря 2009

A CountDownLatch предназначен именно для этой цели. Примеры можно найти здесь и здесь . Если число потоков заранее неизвестно, рассмотрим Phaser, новый в Java 1.7, или UpDownLatch.

3 голосов
/ 16 декабря 2009

Чтобы ответить на ваш второй вопрос, я думаю, что вы делаете разумную работу, пытаясь очистить свой пул потоков.

Что касается вашего первого вопроса, я думаю, что вы хотите использовать метод submit вместо execute. Вместо того, чтобы пытаться объяснить все это в тексте, вот отредактированный фрагмент из написанного мною модульного теста, который выполняет много задач, каждый из которых выполняет фрагмент всей работы, а затем встречается в начальной точке, чтобы добавить результаты:

final AtomicInteger messagesReceived = new AtomicInteger(0);

// ThreadedListenerAdapter is the class that I'm testing 
// It's not germane to the question other than as a target for a thread pool.
final ThreadedListenerAdapter<Integer> adapter = 
    new ThreadedListenerAdapter<Integer>(listener);
int taskCount = 10;

List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

for (int whichTask = 0; whichTask < taskCount; whichTask++) {
    FutureTask<Integer> futureTask = 
        new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            // Does useful work that affects messagesSent
            return messagesSent;
        }
    });
    taskList.add(futureTask);
}

for (FutureTask<Integer> task : taskList) {
    LocalExecutorService.getExecutorService().submit(task);
}

for (FutureTask<Integer> task : taskList) {
    int result = 0;
    try {
        result = task.get();
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    } catch (ExecutionException ex) {
        throw new RuntimeException("ExecutionException in task " + task, ex);
    }
    assertEquals(maxMessages, result);
}

int messagesSent = taskCount * maxMessages;
assertEquals(messagesSent, messagesReceived.intValue());

Я думаю, что этот фрагмент похож на то, что вы пытаетесь сделать. Ключевыми компонентами были методы submit и get .

1 голос
/ 16 декабря 2009

Прежде всего вы можете использовать ThreadPoolExecutor.submit () метод, который возвращает экземпляр Future, затем после того, как вы отправили все свои рабочие элементы, вы можете выполнить итерацию по этим фьючерсам и вызвать Future.get ( ) на каждом из них.

В качестве альтернативы, вы можете подготовить свои рабочие элементы и запустить их все сразу, используя ThreadPoolExecutor.invokeAll () , который будет ожидать завершения всех рабочих элементов, а затем вы сможете получить результаты выполнения или вызов исключения. тот же метод Future.get ().

...