Параллельное выполнение вызовов - PullRequest
3 голосов
/ 14 июня 2011

Я бы хотел выполнить несколько вызовов параллельно.Но кажется, что ExecutorService всегда ждет, пока все вызовы не будут завершены.

Я пробовал следующее:

final int nThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<PrimeCallable> tasks = new ArrayList<PrimeCallable>();
for(int i = 0; i < nThreads; i++) {
    tasks.add(new PrimeCallable(0, i * 100 + 100, "thread" + i));
}

try {
    for(Future<List<Integer>> result : executorService.invokeAll(tasks)) {
        List<Integer> integers = result.get();
        for(Integer i : integers){
            System.out.println(i);
        }
    }
} catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
} catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

Теперь цикл for вызывается, когда все вызываемые объекты в executorServicefinnished.Насколько я знаю, нет executorService.isParallel setter; -).

Каков правильный подход, чтобы позволить вызовам работать параллельно?

Спасибо за ваши подсказки!

Ответы [ 4 ]

9 голосов
/ 14 июня 2011

Javadocs для invokeAll говорит;

Выполняет заданные задачи, возвращая список Фьючерсов с их статусом и результатами , когда все завершено .Future.isDone () имеет значение true для каждого элемента возвращаемого списка.

Таким образом, invokeAll блокируется до тех пор, пока не завершится каждая задача в коллекции.

5 голосов
/ 14 июня 2011

Служба Executor запускает все ваши вызовы параллельно.Все, что он делает, - это ждет завершения всех параллельных задач, прежде чем двигаться дальше.Так что не все, где все задачи запускаются последовательно.

3 голосов
/ 08 сентября 2011

Звучит как часть того, что вы хотите: ленивое выполнение - вам не нужно делать копию структуры в памяти перед извлечением результатов.

Я бы отнессяэто как итерация + проблема преобразования.Сначала определите итератор для вашего ввода, чтобы каждый вызов next () возвращал Callable, который будет генерировать следующее значение в вашей серии.

Этап преобразования заключается в применении параллельной или параллельной оценки этих Callables., что-то вроде этого (не проверено):

public class ConcurrentTransform
{
  private final ExecutorService executor;
  private final int maxBuffer;

  public ConcurrentTransform(ExecutorService executor, int maxWorkBuffer) {
    this.executor = executor;
    this.maxBuffer = Math.max(1, maxWorkBuffer);
  }

  public <T> Iterator<T> apply(final Iterator<Callable<T>> input) {
    // track submitted work
    final BlockingQueue<Future<T>> submitted = new LinkedBlockingQueue<Future<T>>();

    // submit first N tasks
    for (int i=0; i<maxBuffer && input.hasNext(); i++) {
      Callable<T> task = input.next();
      Future<T> future = executor.submit(task);
      submitted.add(future);
    }

    return new Iterator<T>(){
      @Override
      public synchronized boolean hasNext() {
        return !submitted.isEmpty();
      }
      @Override
      public T next() {
        Future<T> result;
        synchronized (this) {
          result = submitted.poll();
          if (input.hasNext()) {
            submitted.add(executor.submit(input.next()));
          }
        }

        if (result != null) {
          try {
            return result.get(); // blocking
          } catch (Exception e) {
            if (e instanceof RuntimeException) {
               throw (RuntimeException) e;
            } else {
               throw new RuntimeException(e);
            }
          }
        } else {
          throw new NoSuchElementException();
        }
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }};
  }
}

После вызова apply (...) вы переберите итоговые значения, которые под прикрытием будут выполнять объекты Callable параллельно и возвращатьрезультаты в том же порядке, в котором они были введены.Некоторые усовершенствования могут включать дополнительное время ожидания для блокирующего вызова result.get () или управление пулом потоков внутри самого преобразования.

2 голосов
/ 14 июня 2011

Если вы хотите просмотреть результаты, как они есть, используйте ExecutorCompletionService.

...