Обработка списка объектов, использующих завершаемые фьючерсы - PullRequest
1 голос
/ 16 апреля 2019

У меня есть список объектов типа T.У меня также есть функциональный интерфейс, который действует как Supplier, который имеет метод performTask для сущности и отправляет обратно результат R, который выглядит следующим образом:
R performTask(T entity) throws Exception.

Я хочуОтфильтруйте оба: успешные результаты и ошибки и исключения, выходящие из него на отдельных картах.Код, который я здесь написал, требует времени. Пожалуйста, предложите, что можно сделать.

Я зацикливаюсь на списке сущностей, а затем обрабатываю их завершаемое будущее по очереди, что, на мой взгляд, не является правильным способом.,Можете ли вы все предложить, что можно сделать здесь?

private void updateResultAndExceptionMaps(List < T > entities, final TaskProcessor < T, R > taskProcessor) {

 ExecutorService executor = createExecutorService();
 Map < T, R > outputMap = Collections.synchronizedMap(new HashMap < T, R > ());
 Map < T, Exception > errorMap = new ConcurrentHashMap < T, Exception > ();
 try {

  entities.stream()
   .forEach(entity -> CompletableFuture.supplyAsync(() -> {
     try {
      return taskProcessor.performTask(entity);
     } catch (Exception e) {
      errorMap.put(entity, (Exception) e.getCause());
      LOG.error("Error processing entity Exception: " + entity, e);
     }
     return null;
    }, executor)
    .exceptionally(throwable -> {
     errorMap.put(entity, (Exception) throwable);
     LOG.error("Error processing entity Throwable: " + entity, throwable);
     return null;
    })
    .thenAcceptAsync(R -> outputMap.put(entity, R))
    .join()
   ); // end of for-each 

  LOG.info("outputMap Map -> " + outputMap);
  LOG.info("errorMap Map -> " + errorMap);
 } catch (Exception ex) {
  LOG.warn("Error: " + ex, ex);
 } finally {
  executor.shutdown();
 }
}

outputmap должен содержать сущность и результат, R.
errorMap должен содержать сущность и Exception.

1 Ответ

0 голосов
/ 16 апреля 2019

Это потому, что вы перебираете по List сущностей одну за другой, создаете объект CompletableFuture и немедленно блокируете итерацию из-за метода join, который ожидает, пока данный процессор не завершит свою работу или не сгенерирует исключение.Вы можете сделать это с полной поддержкой многопоточности, преобразовав каждую сущность в CompletableFuture, собрав все CompletableFuture экземпляров, и после этого дождитесь, пока все вызовут join для каждого.

Ниже код должен помочь вамcase:

entities.stream()
    .map(entity -> CompletableFuture.supplyAsync(() -> {
            try {
                return taskProcessor.performTask(entity);
            } catch (Exception e) {
                errorMap.put(entity, (Exception) e.getCause());
            }
            return null;
        }, executor)
                .exceptionally(throwable -> {
                    errorMap.put(entity, (Exception) throwable);
                    return null;
                })
                .thenAcceptAsync(R -> outputMap.put(entity, R))
    ).collect(Collectors.toList())
    .forEach(CompletableFuture::join);
...