У меня есть поток, который читает данные следующим образом:
id | color | shade | 5
1 | red | light
2 | green | dark
3 | blue | light
4 | grey | light
Мне нужно прочитать первую строку, получить целое число (5) в этом случае, затем подсчитать оставшиеся строки и выяснить,(true/false
) соответствует ли количество.В этом случае 5 не соответствует 4, поэтому это будет false
.
Я делаю это прямо сейчас, что хорошо работает с некоторыми данными, но начинает выдавать ошибки и дает мне OOM на больших потоках (более 1М записей).Это то, что я делаю
class FirstLine (totalCount: Int)
class ColorLine (id: Int, name: String, shade: String)
class Everything(firstLine: firstLine, List[ColorLine] colors)
val headerResult: Future[FirstLine] =
myRawStr(ctx)
.take(1)
.via(framing("\n"))
.map(_.utf8String)
.map(_.trim)
.map(s => FirstLineParser(s))
.collect {
case Right(fl) => fl
}
.runWith(Sink.head)
val restResult: Future[immutable.Seq[ColorLine]] =
myRawStr(ctx)
.drop(1)
.via(framing("\n"))
.map(_.utf8String)
.map(_.trim)
.map(s => ColorLineParser(s))
.collect {
case Right(color) => color
}
.runWith(Sink.seq)
def validateAndError(everyThing: Everything): Future[List[MyError]] =
validate(everyThing) match {
case Left(errors: Seq[MyError]) =>
val persisted: Future[ValidatedError] = ctx.asScala.self ? (
(ref: ActorRef[ValidatedError]) =>
PersistError(someId,Some(ref)))
persisted.map(_ => errors)
case Right(_) =>
Future.successful(Nil)
}
for {
header <- headerResult
rest <- restResult
res <- validateAndError(Everything(header, rest)
} yield res
Вопрос
Есть ли способ повысить эффективность приведенного выше кода, чтобы он работал для более 1М записей?