Объединить несколько параллельных объектов в один список - PullRequest
0 голосов
/ 11 мая 2018

У меня есть карта ключ-значение, итерации по ключам, вызов службы и, основываясь на ответе, я добавляю весь ответ к некоторому uberList

Как я могу одновременно выполнять различные операции? Поможет ли изменение stream() на parallelStream()? Синхронизируется ли оно, когда добавляется к uberList?

Идея состоит в том, чтобы минимизировать время отклика.

List<MyClass> uberList = new LinkedList<>();

Map<String, List<MyOtherClass>> map = new HashMap();

//Populate map

map.entrySet().stream().filter(s -> s.getValue().size() > 0 && s.getValue().values().size() > 0).forEach(
   y  -> {

        // Do stuff
        if(noError) {
            uberList.add(MyClass3);
        }

   }

}  

//Do stuff on uberList

Ответы [ 4 ]

0 голосов
/ 12 мая 2018

Как я могу одновременно выполнять различные операции?

Один поток может выполнять одну задачу за раз. Если вы хотите выполнять несколько операций одновременно, вам нужно переключиться на другие потоки. Вы можете создать новый поток или использовать ExecutorService для управления пулом потоков, поставить задачу в очередь и выполнить задачу за вас.

Поможет ли изменение stream () наrallelStream ()?

Да, это так. Внутренне parallelStream() использует ForkJoinPool.commonPool() для запуска задач для вас. Но имейте в виду, что parallelStream() не имеет никаких гарантий относительно параллельности возвращаемого потока (но пока текущая реализация возвращает параллельный)

Синхронизируется ли оно при добавлении в uberList?

Вы должны выполнить часть синхронизации в forEach конвейере. Обычно вы не хотите вызывать collection.add() внутри forEach для создания коллекции. Вместо этого вы должны использовать .map().collect(toX()) методы. Это освобождает вас от синхронизирующей части:

  1. Не требуется знать о вашей локальной переменной (в данном случае uberlist. И она не изменит ее при выполнении, поможет уменьшить множество странных ошибок, вызванных параллелизмом
  2. Вы можете свободно изменять тип коллекции в .collect() части. Это дает вам больше контроля над типом результата.
  3. При использовании с параллельным потоком не требует поточно-ориентированной обработки или синхронизации для заданной коллекции. Потому что «несколько промежуточных результатов могут быть созданы, заполнены и объединены для обеспечения изоляции изменяемых структур данных» ( Подробнее об этом здесь )

Итак, вам нужно выполнить несколько одинаковых вызовов службы одновременно и собрать свой результат в список.

Вы можете сделать это просто параллельным потоком:

uberList = map.entrySet().stream()
                         .parallel()  // Use .stream().parallel() to force parallism. The .parallelStream() does not guarantee that the returned stream is parallel stream
                         .filter(yourCondition)
                         .map(e -> yourService.methodCall(e))
                         .collect(Collectors.toList());

Довольно круто, не правда ли?

Но, как я уже говорил, в параллельном потоке по умолчанию используется ForkJoinPool.commonPool() для организации очередей и выполнения потоков.

Плохая часть, если ваши yourService.methodCall(e) выполняют тяжелые операции ввода-вывода (например, HTTP-вызов, даже вызов DB) ... или длительное выполнение задачи , тогда это может исчерпать пул другие входящие задачи будут стоять в очереди навсегда, ожидая выполнения.

Таким образом, обычно все другие задачи зависят от этого общего пула (не только ваш yourService.methodCall(e), но и весь другой параллельный поток) будет замедляться из-за времени ожидания.

Чтобы решить эту проблему, вы можете принудительно выполнить параллелизм в своем собственном пуле fork-join:

ForkJoinPool forkJoinPool = new ForkJoinPool(4); // Typically set it to Runtime.availableProcessors()
uberlist = forkJoinPool.submit(() -> {
     return map.entrySet().stream()
                             .parallel()  // Use .stream().parallel() to force parallism. The .parallelStream() does not guarantee that the returned stream is parallel stream
                             .filter(yourCondition)
                             .map(e -> yourService.methodCall(e))
                             .collect(Collectors.toList());
}).get();
0 голосов
/ 11 мая 2018

Параллельный поток поможет в исполнении одновременно.Но не рекомендуется делать цикл forEach и добавлять элемент во внешний список.Если вы сделаете это, вы должны убедиться в синхронизации внешнего списка.Лучший способ сделать это - использовать карту и собрать результат в список.В этом случае parallelStream заботится о синхронизации.

List<MyClass> uberList = map.entrySet().parallelStream().filter(s -> 
s.getValue().size() > 0 && s.getValue().values().size() > 
0).map(
y  -> {
    // Do stuff
        return MyClass3;
    }
 }
.filter(t -> check no ertor condition)
.collect (Collectors.toList())
0 голосов
/ 11 мая 2018

Возможно, вы не хотите использовать parallelStream для параллелизма, только для параллелизма. (То есть: используйте его для задач, где вы хотите эффективно использовать несколько физических процессов для задачи, которая является концептуально последовательной, а не для задач, в которых вы хотите, чтобы несколько вещей происходили одновременно концептуально.)

В вашем случае вам, вероятно, было бы лучше использовать ExecutorService или, точнее, com.google.common.util.concurrent.ListenableExecutorService из Google Guava (предупреждение: я не пытался скомпилировать приведенный ниже код, могут быть синтаксические ошибки):

int MAX_NUMBER_OF_SIMULTANEOUS_REQUESTS = 100;
ListeningExecutorService myExecutor = 
    MoreExecutors.listeningDecorator(
        Executors.newFixedThreadPool(MAX_NUMBER_OF_SIMULTANEOUS_REQUESTS));

List<ListenableFuture<Optional<MyClass>>> futures = new ArrayList<>();
for (Map.Entry<String, List<MyOtherClass>> entry : map.entrySet()) {
  if (entry.getValue().size() > 0 && entry.getValue().values().size() > 0) {
    futures.add(myExecutor.submit(() -> {    
      // Do stuff
      if(noError) {
        return Optional.of(MyClass3);
      } else {
        return Optional.empty();
      }
    }));
  }
}

List<MyClass> uberList = Futures.successfulAsList(futures)
    .get(1, TimeUnit.MINUTES /* adjust as necessary */)
    .stream()
    .filter(Optional::isPresent)
    .map(Optional::get)
    .collect(Collectors.toList());

Преимущество этого кода в том, что он позволяет вам явно указывать, что все задачи должны начинаться в «одно и то же время» (по крайней мере, концептуально), и позволяет вам явно контролировать свой параллелизм (сколько разрешено одновременных запросов? мы делаем, если некоторые из задач терпят неудачу? Как долго мы готовы ждать? и т. д.). Параллельные потоки на самом деле не для этого.

0 голосов
/ 11 мая 2018

Я полагаю, что вы можете использовать параллельный поток, синхронизированную карту и список, чтобы делать то, что вам нужно.

...