Java: уведомить основной класс, когда все потоки в пуле потоков завершены / один и тот же экземпляр объекта в разных потоках - PullRequest
3 голосов
/ 17 июля 2011

Как мне уведомить мой основной класс, который создает экземпляр ThreadPoolExecutor, когда все потоки в ThreadPoolExecutor завершены?

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

        threadClass1 = new ThreadClass;
        threadClass2 = new ThreadClass;

        threadPool.execute(threadClass1);
        threadPool.execute(threadClass2);

        //Now I would like to do something until the threadPool is done working
        //The threads fill a ConcurrentLinkedQueueand I would like to poll
        //the queue as it gets filled by the threads and output 
        //it to XML via JAX-RS

}

РЕДАКТИРОВАТЬ 1

Как мои потоки извлекают данные изгде-то и заполнить эту информацию в ConcurrentLinkedQueue Я в основном хотел бы выполнить какое-то действие в MyClass, чтобы обновить вывод XML с результатами.Когда все потоки завершены, я хотел бы вернуть true веб-службе JAX-RS, которая создала экземпляр MyClass, чтобы веб-служба знала, что все данные были выбраны, и теперь может отображать окончательный файл XML

EDIT 2

Я передаю Queue потокам, чтобы они могли добавлять элементы в очередь.Когда один driver завершает добавление элементов в articleQueue, я хочу выполнить действие в своем основном классе, опрашивая сущность из Queue и передавая ее объекту response, чтобы каким-то образом отобразить его.

Когда я передаю очередь потокам, работают ли они с одним и тем же объектом или с «копией» объекта, чтобы изменения в потоке не влияли на основной объект?Это не то поведение, которое я хочу.Когда я проверяю размер articleQueue в Driver, он равен 18, размер articleQueue в DriverController равен 0.

Есть ли лучший способреагировать, когда поток добавил что-то в очередь, кроме моего цикла while?Как мне изменить мой код для доступа к одному и тому же объекту в разных классах?

DriverController

public class DriverController {

    Queue<Article> articleQueue;

    ThreadPoolExecutor threadPool = null;
    final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            maxPoolSize);

    public DriverController(Response response) {

        articleQueue = new ConcurrentLinkedQueue<Article>();
        threadPool = new ThreadPoolExecutor();
        Driver driver = new Driver(this.articleQueue);

        threadPool.execute(driver);
        // More drivers would be executed here which add to the queue

        while (threadPool.getActiveCount() > 0) {
            // this.articleQueue.size() gives back 0 here ... why?
            if(articleQueue.size()>0){
                response.addArticle(articleQueue.poll());
            }
        }

    }
}

Driver

public class Driver implements Runnable{

    private Queue<Article> articleQueue;

    public DriverAlliedElectronics(Queue articleQueue) {
        this.articleQueue = articleQueue;
    }

    public boolean getData() {
        // Here would be the code where the article is created ...

        this.articleQueue.offer(article);
        return true;
    }

    public void run() {
        this.getData();
        // this.articleQueue.size() gives back 18 here ...

    }
}

Ответы [ 6 ]

4 голосов
/ 17 июля 2011

Вы должны попробовать использовать следующий фрагмент

//Now I would like to wait until the threadPool is done working
threadPool.shutdown();
while (!threadPool.isTerminated()) {
    try {
        threadPool.awaitTermination(10, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
2 голосов
/ 17 июля 2011

Возможно ExecutorCompletionService может быть правильным для вас:

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html

Пример по ссылке выше:

void solve(Executor e, Collection<Callable<Result>> solvers)
  throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null) 
            use(r);
    }
}
2 голосов
/ 17 июля 2011

Вместо использования execute вы должны использовать submit . Это вернет экземпляр Future , в котором вы можете дождаться завершения задачи (й). Таким образом, вам не нужно опрашивать или закрывать бассейн.

2 голосов
/ 17 июля 2011

Я не думаю, что есть способ сделать это явно. Вы можете опросить getCompletedTaskCount(), чтобы дождаться, пока это станет равным нулю.

Почему бы не собрать Future объекты, возвращенные при отправке, и проверить все, что было завершено? Просто позвоните get() по очереди. Так как этот вызов блокирует, вы просто будете ждать каждого из них по очереди и постепенно проваливаться, пока не дождетесь каждого включения.

В качестве альтернативы вы можете отправить потоки и вызвать shutdown() для исполнителя. Таким образом, отправленные задачи будут выполнены, а затем вызван метод terminated (). Если вы переопределите это, вы получите обратный вызов, как только все задачи будут выполнены (очевидно, вы не сможете снова использовать этого исполнителя).

1 голос
/ 17 июля 2011

Я думаю, что вы немного перерабатываете вещи.Вы действительно не заботитесь о потоках или пуле потоков, и это правильно.Java предоставляет хорошие абстракции, так что вам не нужно.Вам просто нужно знать, когда ваши задачи выполнены, и методы для этого существуют.Просто отправьте свою работу и подождите, пока будущее не скажет, что все готово.Если вы действительно хотите знать, как только одна задача будет выполнена, вы можете посмотреть все фьючерсы и принять меры, как только одна из них будет завершена.Если нет, и вы заботитесь только о том, чтобы все было завершено, вы можете удалить некоторые сложности из кода, который я собираюсь опубликовать.Попробуйте это по размеру (обратите внимание, что MultithreadedJaxrsResource - исполняемый файл):

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.*;
import java.util.concurrent.*;

@Path("foo")
public class MultithreadedJaxrsResource {
    private ExecutorService executorService;

    public MultithreadedJaxrsResource(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @GET
    @Produces(MediaType.APPLICATION_XML)
    public AllMyArticles getStuff() {
        List<Future<Article>> futures = new ArrayList<Future<Article>>();
        // Submit all the tasks to run
        for (int i = 0; i < 10; i++) {
            futures.add(executorService.submit(new Driver(i + 1)));
        }
        AllMyArticles articles = new AllMyArticles();
        // Wait for all tasks to finish
        // If you only care that everything is done and not about seeing
        // when each one finishes, this outer do/while can go away, and
        // you only need a single for loop to wait on each future.
        boolean allDone;
        do {
            allDone = true;
            Iterator<Future<Article>> futureIterator = futures.iterator();
            while (futureIterator.hasNext()) {
                Future<Article> future =  futureIterator.next();
                if (future.isDone()) {
                    try {
                        articles.articles.add(future.get());
                        futureIterator.remove();
                    } catch (InterruptedException e) {
                        // thread was interrupted. don't do that.
                        throw new IllegalStateException("broken", e);
                    } catch (ExecutionException e) {
                        // execution of the Callable failed with an
                        // exception. check it out.
                        throw new IllegalStateException("broken", e);
                    }
                } else {
                    allDone = false;
                }
            }
        } while (!allDone);
        return articles;
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        AllMyArticles stuff =
            new MultithreadedJaxrsResource(executorService).getStuff();
        System.out.println(stuff.articles);
        executorService.shutdown();
    }
}

class Driver implements Callable<Article> {
    private int i; // Just to differentiate the instances

    public Driver(int i) {
        this.i = i;
    }

    public Article call() {
        // Simulate taking some time for each call
        try {
            Thread.sleep(1000 / i);
        } catch (InterruptedException e) {
            System.err.println("oops");
        }
        return new Article(i);
    }
}

class AllMyArticles {
    public final List<Article> articles = new ArrayList<Article>();
}

class Article {
    public final int i;

    public Article(int i) {
        this.i = i;
    }

    @Override
    public String toString() {
        return "Article{" +
                       "i=" + i +
                       '}';
    }
}

После этого вы можете ясно видеть, что задачи возвращаются в порядке их завершения, поскольку последняя задача завершается первой благодаря спящемукратчайшее времяЕсли вас не волнует порядок выполнения и вы просто хотите дождаться завершения всех, цикл становится намного проще:

for (Future<Article> future : futures) {
    try {
        articles.articles.add(future.get());
    } catch (InterruptedException e) {
        // thread was interrupted. don't do that.
        throw new IllegalStateException("broken", e);
    } catch (ExecutionException e) {
        // execution of the Callable failed with an exception. check it out.
        throw new IllegalStateException("broken", e);
    }
}
1 голос
/ 17 июля 2011

Судя по справочной документации у вас есть несколько вариантов:

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
    threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

    threadClass1 = new ThreadClass;
    threadClass2 = new ThreadClass;

    threadPool.execute(threadClass1);
    threadPool.execute(threadClass2);

    //Now I would like to wait until the threadPool is done working

    //Option 1:  shutdown() and awaitTermination()
    threadPool.shutDown();
    try {
        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }

    //Option 2:  getActiveCount()
    while (threadPool.getActiveCount() > 0) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

    //Option 3:  getCompletedTaskCount()
    while (threadPool.getCompletedTaskCount() < totalNumTasks) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

}

Учитывая все обстоятельства, я думаю, shutdown() и awaitTermination() - лучший вариант из трех.

...