Общее решение
При заданном итераторе Paths
значений Source
строк файла можно создать, комбинируя FileIO
& flatMapConcat
:
val lineSourceFromPaths : (() => Iterator[Path]) => Source[String, _] = pathsIterator =>
Source
.fromIterator(pathsIterator)
.flatMapConcat { path =>
FileIO
.fromPath(path)
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
}
Приложение к вопросу
Причина, по которой ваше List
пусто, состоит в том, что значения Future
не заполнены, и поэтому ваш изменяемый список не будет обновлен до того, как функция вернет список.
Критика кода в вопросе
Организация и стиль кода в вопросе предполагают несколько недоразумений, связанных с akka
& Future
. Я думаю, что вы пытаетесь довольно сложный рабочий процесс без понимания основ инструментов, которые вы пытаетесь использовать.
1.Не следует создавать ActorSystem
каждый раз, когда вызывается функция. Обычно на 1 приложение приходится 1 ActorSystem, и оно создается только один раз.
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
def readStream(...
2.Вы должны стараться избегать изменчивых коллекций и вместо этого использовать Iterator
с соответствующей функциональностью:
def concatFiles(path : String, date : String, numberOfDays : Int) : List[scala.Array[String]] = {
val formattedDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyyMMdd"))
val pathsIterator : () => Iterator[Path] = () =>
Iterator
.range(0, numberOfDays+1)
.map(formattedDate.minusDays)
.map(_.String().replace("-", "")
.map(path => Paths.get(path + "transactions_" + date + ".data")
lineSourceFromPaths(pathsIterator)
3. Поскольку вы имеете дело с фьючерсами, вам не следует ждать завершения фьючерса, а вместо этого следует изменить тип возврата с concateFiles
на Future[List[Array[String]]]
.