Я бы использовал BlockingQueue
. Вы можете попробовать что-то вроде этого:
final BlockingQueue<Message> q = new LinkedBlockingQueue<Message>();
// start all the worker threads,
// making them report progress by adding Messages into the queue
for (...) {
threadManager.submit(new Runnable() {
// initialize the worker
q.offer(new Message("10% completed"));
// do half of the work
q.offer(new Message("50% completed"));
// do the rest of the work
q.offer(new Message("100% completed"));
});
}
// after starting all the workers, call a blocking retrieval method
// on the queue, waiting for the messages
while (!(everything is done)) {
Message m = q.take();
// process the message
}
Идея состоит в том, что все рабочие потоки и основной поток имеют общую очередь блокировки. Рабочие потоки являются производителями, а основной поток - потребителями.
Здесь я использую .offer(...)
, но вы можете использовать .add(...)
, и вы получите то же самое в этом случае. Разница будет в том, что первый вернёт false, если очередь заполнена, а последний выдаст исключение. Это не может произойти здесь, так как я создал экземпляр неограниченной очереди (очевидно, вы можете получить OutOfMemoryError). Однако, если вы используете .put(...)
, вам придется иметь дело с неконтролируемым InterruptedException
(который в этом случае бесполезен: его нельзя выбросить, потому что очередь никогда не заполнена - она не ограничена), поэтому метод никогда не будет блокироваться).
Очевидно, что вы можете улучшить его, реорганизовав вызовы q.offer(...)
в метод reportStatus(...)
или что-то в этом роде.
Может быть, вы хотите что-то другое, чем я предложил выше. Возможно, вы хотите, чтобы основной поток получал асинхронные сообщения только тогда, когда каждый рабочий поток завершает свою работу.
Мое решение похоже на то, что было предложено Питером Лоури, с той разницей, что с этим решением управляющий поток получит уведомление, как только закончится какой-либо из рабочих потоков (в то время как с решением Питера это результат рабочих потоков будет получен по порядку - поэтому, если первый рабочий поток будет работать вечно, даже если все другие рабочие потоки завершат свою работу, вы об этом не узнаете). В зависимости от того, что вы делаете, вы можете повысить производительность.
Обратите внимание, что с помощью этого решения вы не сможете определить, какой поток завершил свою работу (для этого у вашего Worker
класса должен быть метод типа .getJobId()
или что-то подобное), в то время как с решением Питера вы будете точно знать какой поток завершил свою работу (поскольку вы выполняете итерацию в списке futures
в том же порядке, в котором вы создали экземпляр и запустили Workers).
Если это так, вы можете использовать ExecutorCompletionService
. Он инкапсулирует ExecutorService
и предоставляет метод .take()
(блокирующий поиск), который вернет Future
s из службы исполнителя сразу после их завершения:
ExecutorCompletionService<Worker> ecs = new ExecutorCompletionService(executorService);
// Worker has to be a callable
// Now you submit your tasks to the ecs:
for (...) {
ecs.submit(new Worker(...));
}
// Now you block on .take():
while (!(everything is done)) {
Future<Worker> f = ecs.take();
// f.get(), for example, to check the results
}
CompletionService
гарантирует, что, как только вы возьмете Future
, вызов его метода .get()
будет немедленно завершен: либо путем возврата результата, либо с помощью исключения ExecutionException, инкапсулирующего исключение, выданное Callable
.