Чтение нескольких файлов с помощью потоков akka в scala - PullRequest
0 голосов
/ 26 июня 2019

Я пытаюсь прочитать несколько файлов с помощью потоков akka и поместить результат в список. Я могу прочитать один файл без проблем. тип возвращаемого значения - Future [Seq [String]]. проблема заключается в обработке последовательности внутри будущего должно идти внутри onComplete {}.

Я пробую следующий код, но, очевидно, он не будет работать. список acc вне onComplete пуст. но содержит значения внутри inComplete. Я понимаю проблему, но я не знаю, как подойти к этому.

// works fine  
def readStream(path: String, date: String): Future[Seq[String]] = {
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)

val result: Future[Seq[String]] =
  FileIO.fromPath(Paths.get(path + "transactions_" + date + 
".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .toMat(Sink.seq)(Keep.right)
    .run()
 var aa: List[scala.Array[String]] = Nil
 result.onComplete(x => {
  aa = x.get.map(line => line.split('|')).toList
})
 result
}

//this won't work  
def concatFiles(path : String, date : String, numberOfDays : Int) : 
List[scala.Array[String]] = {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
val formattedDate = LocalDate.parse(date, formatter);
var acc = List[scala.Array[String]]()

for( a <- 0 to numberOfDays){
  val date = formattedDate.minusDays(a).toString().replace("-", "")


  val transactions = readStream(path , date)
  var result: List[scala.Array[String]] = Nil
  transactions.onComplete(x => {
    result = x.get.map(line => line.split('|')).toList 
    acc=  acc ++ result })
}
acc}

1 Ответ

1 голос
/ 26 июня 2019

Общее решение

При заданном итераторе Paths значений Source строк файла можно создать, комбинируя FileIO & flatMapConcat:

val lineSourceFromPaths : (() => Iterator[Path]) => Source[String, _] = pathsIterator =>
  Source
    .fromIterator(pathsIterator)
    .flatMapConcat { path =>
      FileIO
        .fromPath(path)
        .via(Framing.delimiter(ByteString("\n"), 256, true))
        .map(_.utf8String)
    }

Приложение к вопросу

Причина, по которой ваше List пусто, состоит в том, что значения Future не заполнены, и поэтому ваш изменяемый список не будет обновлен до того, как функция вернет список.

Критика кода в вопросе

Организация и стиль кода в вопросе предполагают несколько недоразумений, связанных с akka & Future. Я думаю, что вы пытаетесь довольно сложный рабочий процесс без понимания основ инструментов, которые вы пытаетесь использовать.

1.Не следует создавать ActorSystem каждый раз, когда вызывается функция. Обычно на 1 приложение приходится 1 ActorSystem, и оно создается только один раз.

implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)

def readStream(...

2.Вы должны стараться избегать изменчивых коллекций и вместо этого использовать Iterator с соответствующей функциональностью:

def concatFiles(path : String, date : String, numberOfDays : Int) : List[scala.Array[String]] = {

  val formattedDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyyMMdd"))

  val pathsIterator : () => Iterator[Path] = () => 
    Iterator
      .range(0, numberOfDays+1)
      .map(formattedDate.minusDays)
      .map(_.String().replace("-", "")
      .map(path => Paths.get(path + "transactions_" + date + ".data")

  lineSourceFromPaths(pathsIterator)

3. Поскольку вы имеете дело с фьючерсами, вам не следует ждать завершения фьючерса, а вместо этого следует изменить тип возврата с concateFiles на Future[List[Array[String]]].

...