Если вы хотите обрабатывать большой файл построчно, не требуя загрузки всего содержимого файла сразу в память, тогда вы можете использовать Iterator
, возвращаемый scala.io.Source
.
У меня есть небольшая функция tryProcessSource
(содержащая две подфункции), которую я использую именно для этих типов сценариев использования.Функция принимает до четырех параметров, из которых требуется только первый.Остальные параметры имеют нормальные значения по умолчанию.
Вот профиль функции (полная реализация функции находится внизу):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
Требуется первый параметр, file: File
.И это просто любой действительный экземпляр java.io.File
, который указывает на строчно-ориентированный текстовый файл, например CSV.
Второй параметр, parseLine: (Int, String) => Option[List[String]]
, является необязательным.И если предусмотрено, это должна быть функция, ожидающая получения двух входных параметров;index: Int
, unparsedLine: String
.А затем верните Option[List[String]]
.Функция может возвращать Some
упакованный List[String]
, состоящий из допустимых значений столбца.Или он может вернуть None
, который указывает, что весь процесс потоковой передачи прерывается на ранней стадии.Если этот параметр не указан, предоставляется значение по умолчанию (index, line) => Some(List(line))
.Это значение по умолчанию приводит к тому, что вся строка возвращается как единственное значение String
.
Третий параметр, filterLine: (Int, List[String]) => Option[Boolean]
, является необязательным.И если предусмотрено, это должна быть функция, ожидающая получения двух входных параметров;index: Int
, parsedValues: List[String]
.А затем верните Option[Boolean]
.Функция может возвращать Some
wrapped Boolean
, указывающее, должна ли эта конкретная строка быть включена в вывод.Или он может вернуть None
, который указывает, что весь процесс потоковой передачи прерывается рано.Если этот параметр не указан, предоставляется значение по умолчанию (index, values) => Some(true)
.Это значение по умолчанию приводит к включению всех строк.
Четвертый и последний параметр, retainValues: (Int, List[String]) => Option[List[String]]
, является необязательным.И если предусмотрено, это должна быть функция, ожидающая получения двух входных параметров;index: Int
, parsedValues: List[String]
.А затем верните Option[List[String]]
.Функция может возвращать Some
упакованный List[String]
, состоящий из некоторого подмножества и / или изменения существующих значений столбца.Или он может вернуть None
, который указывает, что весь процесс потоковой передачи прерывается на ранней стадии.Если этот параметр не указан, предоставляется значение по умолчанию (index, values) => Some(values)
.Это значение по умолчанию приводит к значениям, анализируемым вторым параметром, parseLine
.
Рассмотрим файл со следующим содержанием (4 строки):
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
Следующий профиль вызова...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
... приводит к выводу для tryLinesDefaults
(неизмененного содержимого файла):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
Следующий профиль вызова ...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
... приводит к этому выводу для tryLinesParseOnly
(каждая строка разбирается в значения отдельных столбцов):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
Следующий профиль вызова ...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
... приводит к этому выводу для tryLinesIrvingTxNoHeader
(каждая строка анализируется в значениях отдельных столбцов, без заголовка и только в двух строках в Ирвинге, Техас):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
Вот полная реализация функции tryProcessSource
:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
Хотя это решение было относительно лаконичным, мне потребовалось немало времени и много проходов по рефакторингу, прежде чем я наконец смог добраться сюда.Пожалуйста, дайте мне знать, если вы видите какие-либо способы его улучшения.
ОБНОВЛЕНИЕ: я только что задал вопрос ниже как это собственный вопрос StackOverflow .И теперь у есть ответ, исправляющий ошибку , упомянутую ниже.
У меня была идея попробовать сделать это еще более универсальным, изменив параметр retainValues
на transformLine
с новыми обобщениямиопределение функции ниже.Тем не менее, я получаю ошибку выделения в IntelliJ «Выражение типа Some [List [String]] не соответствует ожидаемому типу Option [A]» и не смог выяснить, как изменить значение по умолчанию, чтобы ошибкауходит.
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
Любая помощь в том, как сделать эту работу будет принята с благодарностью.