Как я могу реализовать или найти эквивалент многопоточного CompletionService? - PullRequest
2 голосов
/ 16 февраля 2011

У меня есть простой веб-сервис, работающий внутри контейнера Tomcat, который по своей природе является многопоточным. В каждом запросе, поступающем в сервис, я хочу совершать параллельные звонки на внешний сервис. Служба ExecutorCompletionService в java.util.concurrent частично помогает мне в этом. Я могу предоставить ему пул потоков, и он позаботится о выполнении моих одновременных вызовов, и я получу уведомление, когда будет готов любой из результатов.

Код для обработки определенного входящего запроса может выглядеть следующим образом:

void handleRequest(Integer[] input) {
    // Submit tasks
    CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(Executors.newCachedThreadPool());
    for (final Integer i : input) {
        completionService.submit(new Callable<Integer>() {
            public Integer call() {
                return -1 * i;
            }
        });
    }

    // Do other stuff...

    // Get task results
    try {
        for (int i = 0; i < input.size; i++) {
            Future<Integer> future = completionService.take();
            Integer result = future.get();
            // Do something with the result...
        }
    } catch (Exception e) {
        // Handle exception
    }
}

Это должно работать нормально и модно, но довольно неэффективно, поскольку для каждого входящего запроса выделяется новый пул потоков. Если я перенесу CompletionService в качестве общего экземпляра, я столкнусь с проблемами безопасности потоков с несколькими запросами, использующими один и тот же CompletionService и пул потоков. По мере того как запросы отправляют задачи и получают результаты, результаты, которые они получают, не совпадают с теми, которые они отправили.

Таким образом, мне нужен поточно-ориентированный CompletionService, который позволяет мне использовать общий пул потоков для всех входящих запросов. Когда каждый поток завершает задачу, соответствующий поток для входящего запроса должен быть уведомлен, чтобы он мог собрать результаты.

Какой самый простой способ реализовать такую ​​функциональность? Я уверен, что этот шаблон был применен много раз; Я просто не уверен, что это что-то, предоставленное библиотекой параллелизма Java, или его можно легко построить с использованием некоторых строительных блоков параллелизма Java.

ОБНОВЛЕНИЕ: Одно замечание, которое я забыл упомянуть, это то, что я хотел бы получать уведомления, как только выполнится любая из представленных мной задач. Это основное преимущество использования CompletionService, поскольку оно разделяет производство и потребление задач и результатов. На самом деле меня не волнует порядок, в котором я получаю результаты обратно, и я бы хотел избежать ненужной блокировки во время ожидания возврата результатов по порядку.

Ответы [ 4 ]

2 голосов
/ 16 февраля 2011

Вы разделяете Executor, но не CompletionService.

У нас есть AsyncCompleter , который делает именно это и обрабатывает всю бухгалтерию, позволяя:

Iterable<Callable<A>> jobs = jobs();
Iterable<A> results async.invokeAll(jobs);

results повторяется в порядке возврата и блокируется, пока не станет доступен результат

1 голос
/ 16 февраля 2011

java.util.concurrent предоставляет все, что вам нужно. Если я правильно понимаю ваш вопрос, у вас есть следующие требования:

Вы хотите отправить запрос и немедленно (в пределах разумного) обработать результат запроса (Ответ). Ну, я полагаю, что вы уже видели решение вашей проблемы: java.util.concurrent.CompletionService.

Этот сервис, который довольно просто комбинирует Executor и BlockingQueue для обработки задач Runnable и / или Callable. BlockingQueue используется для хранения выполненных задач, в результате чего другой поток может ожидать, пока завершенная задача не будет поставлена ​​в очередь (это достигается путем вызова take ()) для объекта CompletionService.

Как уже упоминалось в предыдущих постерах, предоставьте общий доступ к Executor и создайте CompletionService для каждого запроса. Это может показаться дорогостоящим делом, но учтите еще раз, что CS просто сотрудничает с Исполнителем и BlockingQueue. Поскольку вы делите самый дорогой объект для создания экземпляра, то есть исполнителя, я думаю, вы обнаружите, что это очень разумная цена.

Однако ... несмотря на это, кажется, у вас все еще есть проблема, и эта проблема, похоже, заключается в отделении обработки Запросов от обработки Ответов. Это может быть достигнуто путем создания отдельной службы, которая обрабатывает исключительно ответы для всех запросов или для определенного типа запроса.

Вот пример: (Примечание: подразумевается, что объект Request реализует интерфейс Callable, который должен возвращать тип Response ... подробности, которые я пропустил в этом простом примере).

class RequestHandler {

  RequestHandler(ExecutorService responseExecutor, ResponseHandler responseHandler) {      
    this.responseQueue = ...
    this.executor = ...
  }  

  public void acceptRequest(List<Request> requestList) {

    for(Request req : requestList) {

      Response response = executor.submit(req);
      responseHandler.handleResponse(response);

    }  
  }  
}

class ResponseHandler {
  ReentrantLock lock;
  ResponseHandler(ExecutorService responseExecutor) {
    ...
  }

  public void handleResponse(Response res) {
    lock.lock() {
    try {
      responseExecutor.submit( new ResponseWorker(res) );
    } finally {
      lock.unlock();
    }    
  }

  private static class ResponseWorker implements Runnable {

    ResponseWorker(Response response) {
      response = ...
    }

    void processResponse() {         
      // process this response 
    }

    public void run() {      
      processResponse();      
    }  
  }
}

Несколько вещей, которые нужно запомнить: во-первых, ExecutorService выполняет Callables или Runnables из очереди блокировки; ваш RequestHandler получает задачи, которые ставятся в очередь на исполнителя и обрабатываются как можно скорее. То же самое происходит в вашем ResponseHandler; ответ получен, и как только этот отдельный исполнитель сможет, он обработает этот ответ. Короче говоря, у вас одновременно работают два исполнителя: один на объектах запроса, другой на объектах ответа.

1 голос
/ 16 февраля 2011

Вы можете просто использовать обычный общий сервис ExecutorService. Всякий раз, когда вы отправляете задание, вы получаете будущее для задания, которое вы только что отправили. Вы можете сохранить их все в списке и запросить их позже.

Пример:

private final ExecutorService service = ...//a single, shared instance

void handleRequest(Integer[] input) {
    // Submit tasks
    List<Future<Integer>> futures = new ArrayList<Future<Integer>>(input.length);
    for (final Integer i : input) {
        Future<Integer> future = service.submit(new Callable<Integer>() {
            public Integer call() {
                return -1 * i;
            }
        });
        futures.add(future);
    }

    // Do other stuff...

    // Get task results
    for(Future<Integer> f : futures){
        try {
            Integer result = f.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
0 голосов
/ 16 февраля 2011

Зачем тебе CompletionService?

Каждый поток может просто отправить или вызвать Callables в «обычном» и совместно используемом экземпляре ExecutorService. Затем каждый поток хранит свои собственные Future ссылки.

Также, Executor и его потомки являются поточно-ориентированными. На самом деле вы хотите, чтобы каждый поток мог создавать свои собственные задачи и проверять их результаты.

Javadoc в java.util.concurrent превосходен; это включает образцы использования и примеры. Прочитайте документацию по ExecutorService и другим типам, чтобы лучше понять, как их использовать.

...