Подписка на поток Akka, чтение файлов - PullRequest
0 голосов
/ 25 июня 2019

Я пытаюсь прочитать файл в Scala с помощью Akka Streams, я хочу поместить результат в список. Я попытался следующий код, и список увеличивается с новыми значениями внутри приемника, но снаружи я получаю пустой список.

def readStream (path : String, date : String) : List[Array[String]] = {
  var lines: List[scala.Array[String]] = List[scala.Array[String]]()

  implicit val system = ActorSystem("Sys")
  val settings = ActorMaterializerSettings(system)
  implicit val materializer = ActorMaterializer(settings)

  val sink: Sink[String, Future[Done]] = Sink.foreach((x : String) => {
    val list : List[scala.Array[String]] = List(x.split("|"))
    lines = lines ++ list
    // println(lines.length)
  })

  val result: Unit = FileIO.fromPath(Paths.get(path + "transactions_" + date + ".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
    .to(sink)
    .run()
  lines
}

1 Ответ

3 голосов
/ 25 июня 2019

Три вещи: (1) передать систему субъекта и материализатор в ваш метод (явным образом или в виде неявных параметров) вместо создания их внутри метода, (2) использовать Sink.seq и (3) использовать toMat иKeep.right для получения материализованного значения Sink (to сохраняет материализованное значение Source):

val result: Future[Seq[String]] =
  FileIO.fromPath(...)
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .toMat(Sink.seq)(Keep.right)
    .run()

В качестве альтернативы, сокращение для использования toMat и Keep.rightis runWith:

val result: Future[Seq[String]] =
  FileIO.fromPath(...)
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .runWith(Sink.seq)
...