Недостаточно памяти при загрузке большого количества записей из базы данных - PullRequest
0 голосов
/ 19 февраля 2019

Я использую slick в Akka Streams для загрузки большого количества записей (~ 2M) из базы данных (postgresql) и записи их в файл S3.Тем не менее, я заметил, что мой код, приведенный ниже, работает для записей около ~ 50 тыс., Но не работает для отметки более 100 тыс.

  val allResults: Future[Seq[MyEntityImpl]] =
    MyRepository.getAllRecordss()

  val results: Future[MultipartUploadResult] = Source
    .fromFuture(allResults)
    .map(seek => seek.toList)
    .mapConcat(identity)
    .map(myEntity => myEntity.toPSV + "\n")
    .map(s => ByteString(s))
    .runWith(s3Sink)

Ниже приведен пример того, как выглядит myEntity:

case class MyEntityImpl(partOne: MyPartOne, partTwo: MyPartTwo) {
  def toPSV: String = myPartOne.toPSV + myPartTwo.toPSV
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}
case class MyPartOne(field1: String, field2: String) {
  def toPSV: String = {s"$field1|"+s"$field2"}
}

Я ищу способ сделать это более реактивным образом, чтобы он неисчерпать память.

1 Ответ

0 голосов
/ 19 февраля 2019

Основная проблема

Проблема в том, что вы вытягиваете все записи из базы данных в локальную память перед отправкой их в s3Sink.

Первое место, где данные извлекаются в память, вероятно, в вашем MyRepository.getAllRecords() методе.Большинство, если не все, реализации Seq основаны на оперативной памяти.Второе место, где вы определенно используете локальную память, находится в seek.toList, поскольку List хранит все данные в памяти.

Решение

Вместо того, чтобы возвращать Seq из getAllRecords , вы должны возвращать аккуратную акку Source напрямую .Это гарантирует, что вашему материализованному потоку потребуется только память для шагов обработки переходных процессов, прежде чем перейти к s3.

Если определение вашего метода изменилось на:

def getAllRecords() : Source[MyEntityImpl, _]

Тогда остальная часть потока будет работать реактивно:

MyRepository
  .getAllRecords()
  .map(myEntity => myEntity.toPSV + "\n")
  .map(ByteString.apply)
  .runWith(s3Sink)
...