У меня есть код, где я выполняю несколько задач, используя Executors и Blocking Queue. Результаты должны быть возвращены как итератор, потому что именно этого ожидает приложение, над которым я работаю. Однако между заданием и результатами, добавляемыми в очередь, существует отношение 1: N, поэтому я не могу использовать ExecutorCompletionService . При вызове hasNext () мне нужно знать, когда все задачи завершены, и добавить все результаты в очередь, чтобы я мог остановить получение результатов из очереди. Обратите внимание, что, как только элементы помещаются в очередь, другой поток должен быть готов к потреблению ( Executor.invokeAll () , блокирует до тех пор, пока все задачи не будут выполнены, что не является тем, что я хочу, ни тайм-аут). Это была моя первая попытка, я использую AtomicInteger просто для демонстрации сути, даже если она не будет работать. Может ли кто-нибудь помочь мне понять, как я могу решить эту проблему?
public class ResultExecutor<T> implements Iterable<T> {
private BlockingQueue<T> queue;
private Executor executor;
private AtomicInteger count;
public ResultExecutor(Executor executor) {
this.queue = new LinkedBlockingQueue<T>();
this.executor = executor;
count = new AtomicInteger();
}
public void execute(ExecutorTask task) {
executor.execute(task);
}
public Iterator<T> iterator() {
return new MyIterator();
}
public class MyIterator implements Iterator<T> {
private T current;
public boolean hasNext() {
if (count.get() > 0 && current == null)
{
try {
current = queue.take();
count.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return current != null;
}
public T next() {
final T ret = current;
current = null;
return ret;
}
public void remove() {
throw new UnsupportedOperationException();
}
}
public class ExecutorTask implements Runnable{
private String name;
public ExecutorTask(String name) {
this.name = name;
}
private int random(int n)
{
return (int) Math.round(n * Math.random());
}
@SuppressWarnings("unchecked")
public void run() {
try {
int random = random(500);
Thread.sleep(random);
queue.put((T) (name + ":" + random + ":1"));
queue.put((T) (name + ":" + random + ":2"));
queue.put((T) (name + ":" + random + ":3"));
queue.put((T) (name + ":" + random + ":4"));
queue.put((T) (name + ":" + random + ":5"));
count.addAndGet(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
И код вызова выглядит так:
Executor e = Executors.newFixedThreadPool(2);
ResultExecutor<Result> resultExecutor = new ResultExecutor<Result>(e);
resultExecutor.execute(resultExecutor.new ExecutorTask("A"));
resultExecutor.execute(resultExecutor.new ExecutorTask("B"));
Iterator<Result> iter = resultExecutor.iterator();
while (iter.hasNext()) {
System.out.println(iter.next());
}