Основная проблема
Проблема в том, что вы вытягиваете все записи из базы данных в локальную память перед отправкой их в s3Sink
.
Первое место, где данные извлекаются в память, вероятно, в вашем MyRepository.getAllRecords()
методе.Большинство, если не все, реализации Seq
основаны на оперативной памяти.Второе место, где вы определенно используете локальную память, находится в seek.toList
, поскольку List
хранит все данные в памяти.
Решение
Вместо того, чтобы возвращать Seq
из getAllRecords
, вы должны возвращать аккуратную акку Source
напрямую .Это гарантирует, что вашему материализованному потоку потребуется только память для шагов обработки переходных процессов, прежде чем перейти к s3.
Если определение вашего метода изменилось на:
def getAllRecords() : Source[MyEntityImpl, _]
Тогда остальная часть потока будет работать реактивно:
MyRepository
.getAllRecords()
.map(myEntity => myEntity.toPSV + "\n")
.map(ByteString.apply)
.runWith(s3Sink)