Как читать большой поток и сравнивать счетчики? - PullRequest
0 голосов
/ 25 декабря 2018

У меня есть поток, который читает данные следующим образом:

id | color | shade | 5
1 | red | light
2 | green | dark
3 | blue | light
4 | grey | light

Мне нужно прочитать первую строку, получить целое число (5) в этом случае, затем подсчитать оставшиеся строки и выяснить,(true/false) соответствует ли количество.В этом случае 5 не соответствует 4, поэтому это будет false.

Я делаю это прямо сейчас, что хорошо работает с некоторыми данными, но начинает выдавать ошибки и дает мне OOM на больших потоках (более 1М записей).Это то, что я делаю

class FirstLine (totalCount: Int)
class ColorLine (id: Int, name: String, shade: String)
class Everything(firstLine: firstLine, List[ColorLine] colors)

    val headerResult: Future[FirstLine] =
      myRawStr(ctx)
        .take(1)
        .via(framing("\n"))
        .map(_.utf8String)
        .map(_.trim)
        .map(s => FirstLineParser(s))
        .collect {
          case Right(fl) => fl
        }
        .runWith(Sink.head)

    val restResult: Future[immutable.Seq[ColorLine]] =
      myRawStr(ctx)
        .drop(1)
        .via(framing("\n"))
        .map(_.utf8String)
        .map(_.trim)
        .map(s => ColorLineParser(s))
        .collect {
          case Right(color) => color
        }
        .runWith(Sink.seq)

    def validateAndError(everyThing: Everything): Future[List[MyError]] =
      validate(everyThing) match {
        case Left(errors: Seq[MyError]) =>
          val persisted: Future[ValidatedError] = ctx.asScala.self ? (
              (ref: ActorRef[ValidatedError]) =>
                PersistError(someId,Some(ref)))
          persisted.map(_ => errors)

        case Right(_) =>
          Future.successful(Nil)
      }

    for {
      header <- headerResult
      rest <- restResult
      res <- validateAndError(Everything(header, rest)
    } yield res

Вопрос

Есть ли способ повысить эффективность приведенного выше кода, чтобы он работал для более 1М записей?

1 Ответ

0 голосов
/ 31 декабря 2018

Наиболее эффективным способом было бы не собирать restResult как Seq[ColorLine], а вместо этого просто производить подсчет цветных линий в результате:

type Count = Long

val zeroCount : Count = 0L

val countColorLine : (Count, ColorLine) => Count = 
  (count, _) => count + 1

val restResultCount: Future[Count] =
  myRawStr(ctx)
    .drop(1)
    .via(framing("\n"))
    .map(_.utf8String)
    .map(_.trim)
    .map(s => ColorLineParser(s))
    .collect {
      case Right(color) => color
    }
    .runFold(zeroCount)(countColorLine)  
...