Java итератор параллельной работы? - PullRequest
5 голосов
/ 04 августа 2009

Я ищу класс, в котором я могу переопределить метод для выполнения работы и вернуть результаты как итератор. Примерно так:

ParallelWorkIterator<Result> itr = new ParallelWorkIterator<Result>(trials,threads) {

  public Result work() {
    //do work here for a single trial...
    return answer;
  }

};
while (itr.hasNext()) {
  Result result = itr.next();
  //process result...
}

Это в основном будет использоваться для таких вещей, как симуляции Монте-Карло, но я не хочу иметь дело с настройкой пулов потоков и управлением возвращающимися потоками каждый раз. Я прокрутил свой собственный класс, который , надеюсь, выполняет это, но я не достаточно уверен в этом и подумал, что проверю, существовало ли что-то подобное.

Редактировать: чтобы было ясно, я хочу, чтобы он продолжал работать в фоновом режиме и ставил в очередь результаты после каждого рабочего метода, пока все испытания не будут завершены. Поэтому следующий метод может ожидать возврата, пока в очереди не появится результат.

Ответы [ 3 ]

13 голосов
/ 04 августа 2009

Взгляните на ExecutorCompletionService . Он делает все, что вы хотите.

   void solve(Executor e, Collection<Callable<Result>> solvers)
     throws InterruptedException, ExecutionException {
       //This class will hold and execute your tasks
       CompletionService<Result> ecs
           = new ExecutorCompletionService<Result>(e);
       //Submit (start) all the tasks asynchronously
       for (Callable<Result> s : solvers)
           ecs.submit(s);
       //Retrieve completed task results and use them
       int n = solvers.size();
       for (int i = 0; i < n; ++i) {
           Result r = ecs.take().get();
           if (r != null)
               use(r);
       }
   }

Преимущество использования CompletionService заключается в том, что он всегда возвращает первый завершенный результат. Это гарантирует, что вы не ожидаете завершения задач, и позволяет незавершенным задачам работать в фоновом режиме.

2 голосов
/ 04 августа 2009

Я бы порекомендовал посмотреть на Java Executors .

Вы отправляете несколько заданий и получаете для каждого объект Future . Ваша работа обрабатывается в фоновом режиме, и вы перебираете объекты Future (как вы делаете выше). Каждое будущее возвращает результат по мере его появления (вызывая get() - это блокирует, пока результат не будет создан в отдельном потоке)

1 голос
/ 04 августа 2009

Самое близкое, что я могу придумать, это использовать CompletionService для накопления результатов по мере их завершения.

Простой пример:

ExecutorService executor = Executors.newSingleThreadExecutor(); // Create vanilla executor service.
CompletionService<Result> completionService = new ExecutorCompletionService<Result>(executor); // Completion service wraps executor and is notified of results as they complete.
Callable<Result> callable = new MyCallable();

executor.submit(callable); // Do not store handle to Future here but rather obtain from CompletionService when we *know* the result is complete.

Future<Result> fut = completionService.take(); // Will block until a completed result is available.
Result result = fut.get(); // Will not block as we know this future represents a completed result.

Я бы не рекомендовал оборачивать это за интерфейс Iterator, так как метод Future get() может выдать два возможных проверенных исключения: ExecutionException и InterruptedException, и поэтому вам нужно будет перехватить и проглотить или отбросить их как RuntimeException с, но это не очень хорошая вещь. Кроме того, ваши Iterator hasNext() или next() методы потенциально должны были бы блокироваться, если выполнялась задача, что можно было бы считать нелогичным для клиентов, использующих Iterator. Вместо этого я бы реализовал свой собственный более описательный интерфейс; например,

public interface BlockingResultSet {
  /**
   * Returns next result when it is ready, blocking is required.
   * Returns null if no more results are available.
   */  
  Result take() throws InterruptedException, ExecutionException;
}

(Методы, называемые take(), обычно представляют блокирующий вызов в пакете java.util.concurrent).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...