Одновременный запуск параллельной обработки в Akka Streams - PullRequest
0 голосов
/ 04 августа 2020

Я пытаюсь проделать трюк с Akka Streams, при котором пакет элементов будет обрабатываться одновременно. Я заметил, что даже если вы создадите Balance и загрузите его последовательностью, он начнет выполнение для каждого элемента, как только он попадет в поток. порог, а затем одновременно запустить параллельное выполнение? Можно ли это сделать с помощью инструментов Akka Streams, или, может быть, потребуется кодирование параллелизма java / scala?

1 Ответ

1 голос
/ 04 августа 2020

У вас есть несколько вариантов.

Существует целое семейство функций группировки grouped(Int), groupWithin(Int, FiniteDuration), и c, которые вы можете использовать для создания коллекции элементов, генерируемых до тех пор, пока не будет достигнут некоторый порог. заполнено и / или в пределах временного окна, et c. Как только у вас есть этот пакет, вы можете mapAsync его, и там вы можете использовать некоторый детальный контроль над Future, например, вы можете создать Future для каждого элемента, объединить их с Future.sequence и отобразить результат параллельных операций.

stream
  .grouped(10)
  .mapAsync(1) { collection =>
     // create future processing all values in collection at once
  }

Если у вас нет проблем с обработкой более чем одного пакета одновременно, вы можете увеличить параллелизм в mapAsync. Если вам не нужно каким-либо образом комбинировать сгруппированные значения, возможно, для ваших нужд будет достаточно mapAsync с более высоким параллелизмом (или mapAsyncUnordered).

Вы должны помнить, что эти значения в обоих grouped и в mapAsyn c должны быть настроены разумно, потому что, например, если вы попытаетесь сгруппировать 1M элементов, вы можете столкнуться с ошибками OOM.

...