Как минимизировать использование памяти в потоках akka - PullRequest
2 голосов
/ 27 февраля 2020

У меня есть поток, который в какой-то момент сгруппирует объекты для создания файлов. Я думаю, что могу сжать несколько байтов, сериализовав объект в начале потока. Но мой самый большой вопрос о том, как оптимизировать объем памяти для такого потока:

val sourceOfCustomer = Source.repeat(Customer(name = "test"))
def serializeCustomer(customer: Customer) = customer.toString

sourceOfCustomers
.via(serializeCustomer) // 1KB
.grouped(1000000) // 1GB
.via(processFile) // 1GB
.via(moreProcessing) // 1GB
.via(evenMoreProcessing) // 1GB
.to(fileSink) // 1GB

Это дает мне использование памяти в устойчивом состоянии , по крайней мере, 5 ГБ . Это правильно?

Какую стратегию я могу использовать, чтобы ограничить ее только 1 или 2 ГБ? В принципе это должно быть возможно путем свертывания операторов.

Примечание: я знаю, что решение состоит в том, чтобы сделать группу меньше, но давайте рассмотрим размер группы как ограничение проблемы.

1 Ответ

4 голосов
/ 27 февраля 2020

Извините, может быть, я что-то упустил, но я не нашел group операцию в последней документации Akka Stream, думаю, вы имеете в виду grouped операция: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/grouped.html.

Если это так, то это означает, что на .grouped(1000000) // 1GB вы создаете группу элементов в потоке, которые могут обрабатываться одновременно, следовательно, одна или более чем одна группа размером 1 ГБ может присутствовать в памяти в один момент времени. Таким образом, чтобы ограничить объем памяти в вашем потоке до 1 ГБ, вы можете go одним из двух способов:

1) Сократить количество одновременных обработки больших групп. Этого можно достичь с помощью операции throttle: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/throttle.html#throttle См., Например, фрагмент кода

import scala.concurrent.duration._
...

.group(1000000) // 1GB
.throttle(1, 1 minute)

2) Уменьшение размера большой группы

val parallelismLevel = Runtime.getRuntime.availableProcessors() // or another custom level which represents stream processing parallelism
val baseGroupSize = 1000000 // 1GB
val groupSize =  baseGroupSize / parallelismLevel 

sourceOfCustomers
.via(serializeCustomer) // 1KB
.group(groupSize)

Надеюсь, это поможет!

...