Как мне прочитать большой CSV-файл с помощью класса Scala Stream? - PullRequest
38 голосов
/ 23 ноября 2010

Как мне прочитать большой CSV-файл (> 1 Гб) с помощью Scala Stream?У вас есть пример кода?Или вы бы использовали другой способ для чтения большого CSV-файла без предварительной загрузки его в память?

Ответы [ 3 ]

68 голосов
/ 23 ноября 2010

Просто используйте Source.fromFile(...).getLines, как вы уже заявили.

, который возвращает Iterator, который уже ленив (вы бы использовали поток в качестве отложенной коллекции, где вы хотели, чтобы ранее полученные значения были запомнены, так что выможете прочитать их снова)

Если у вас проблемы с памятью, проблема будет в том, что вы делаете после getLines.Любая операция, такая как toList, которая вызывает строгий сбор, вызовет проблему.

12 голосов
/ 23 ноября 2010

Надеюсь, вы не имеете в виду Scala collection.immutable.Stream с Stream. Это не , что вы хотите. Поток ленив, но запоминает.

Я не знаю, что вы планируете делать, но простое чтение файла построчно должно работать очень хорошо без использования большого количества памяти.

getLines следует оценивать лениво и не вылетать (если в вашем файле не более 2? 2 строк, аааик). Если это так, спросите на #scala или подайте заявку на ошибку (или сделайте и то, и другое).

3 голосов
/ 15 декабря 2015

Если вы хотите обрабатывать большой файл построчно, не требуя загрузки всего содержимого файла сразу в память, тогда вы можете использовать Iterator, возвращаемый scala.io.Source.

У меня есть небольшая функция tryProcessSource (содержащая две подфункции), которую я использую именно для этих типов сценариев использования.Функция принимает до четырех параметров, из которых требуется только первый.Остальные параметры имеют нормальные значения по умолчанию.

Вот профиль функции (полная реализация функции находится внизу):

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
  ???
}

Требуется первый параметр, file: File.И это просто любой действительный экземпляр java.io.File, который указывает на строчно-ориентированный текстовый файл, например CSV.

Второй параметр, parseLine: (Int, String) => Option[List[String]], является необязательным.И если предусмотрено, это должна быть функция, ожидающая получения двух входных параметров;index: Int, unparsedLine: String.А затем верните Option[List[String]].Функция может возвращать Some упакованный List[String], состоящий из допустимых значений столбца.Или он может вернуть None, который указывает, что весь процесс потоковой передачи прерывается на ранней стадии.Если этот параметр не указан, предоставляется значение по умолчанию (index, line) => Some(List(line)).Это значение по умолчанию приводит к тому, что вся строка возвращается как единственное значение String.

Третий параметр, filterLine: (Int, List[String]) => Option[Boolean], является необязательным.И если предусмотрено, это должна быть функция, ожидающая получения двух входных параметров;index: Int, parsedValues: List[String].А затем верните Option[Boolean].Функция может возвращать Some wrapped Boolean, указывающее, должна ли эта конкретная строка быть включена в вывод.Или он может вернуть None, который указывает, что весь процесс потоковой передачи прерывается рано.Если этот параметр не указан, предоставляется значение по умолчанию (index, values) => Some(true).Это значение по умолчанию приводит к включению всех строк.

Четвертый и последний параметр, retainValues: (Int, List[String]) => Option[List[String]], является необязательным.И если предусмотрено, это должна быть функция, ожидающая получения двух входных параметров;index: Int, parsedValues: List[String].А затем верните Option[List[String]].Функция может возвращать Some упакованный List[String], состоящий из некоторого подмножества и / или изменения существующих значений столбца.Или он может вернуть None, который указывает, что весь процесс потоковой передачи прерывается на ранней стадии.Если этот параметр не указан, предоставляется значение по умолчанию (index, values) => Some(values).Это значение по умолчанию приводит к значениям, анализируемым вторым параметром, parseLine.

Рассмотрим файл со следующим содержанием (4 строки):

street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240

Следующий профиль вызова...

val tryLinesDefaults =
  tryProcessSource(new File("path/to/file.csv"))

... приводит к выводу для tryLinesDefaults (неизмененного содержимого файла):

Success(
  List(
    List("street,street2,city,state,zip"),
    List("100 Main Str,,Irving,TX,75039"),
    List("231 Park Ave,,Irving,TX,75039"),
    List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
  )
)

Следующий профиль вызова ...

val tryLinesParseOnly =
  tryProcessSource(
      new File("path/to/file.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
  )

... приводит к этому выводу для tryLinesParseOnly (каждая строка разбирается в значения отдельных столбцов):

Success(
  List(
    List("street","street2","city","state","zip"),
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
    List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
  )
)

Следующий профиль вызова ...

val tryLinesIrvingTxNoHeader =
  tryProcessSource(
      new File("C:/Users/Jim/Desktop/test.csv")
    , parseLine =
        (index, unparsedLine) => Some(unparsedLine.split(",").toList)
    , filterLine =
        (index, parsedValues) =>
          Some(
            (index != 0) && //skip header line
            (parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
            (parsedValues(3).toLowerCase == "Tx".toLowerCase)
          )
  )

... приводит к этому выводу для tryLinesIrvingTxNoHeader (каждая строка анализируется в значениях отдельных столбцов, без заголовка и только в двух строках в Ирвинге, Техас):

Success(
  List(
    List("100 Main Str","","Irving,TX","75039"),
    List("231 Park Ave","","Irving","TX","75039"),
  )
)

Вот полная реализация функции tryProcessSource:

import scala.io.Source
import scala.util.Try

import java.io.File

def tryProcessSource(
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  retainValues: (Int, List[String]) => Option[List[String]] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
  def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
    try {Try(transfer(source))} finally {source.close()}
  def recursive(
    remaining: Iterator[(String, Int)],
    accumulator: List[List[String]],
    isEarlyAbort: Boolean =
      false
  ): List[List[String]] = {
    if (isEarlyAbort || !remaining.hasNext)
      accumulator
    else {
      val (line, index) =
        remaining.next
      parseLine(index, line) match {
        case Some(values) =>
          filterLine(index, values) match {
            case Some(keep) =>
              if (keep)
                retainValues(index, values) match {
                  case Some(valuesNew) =>
                    recursive(remaining, valuesNew :: accumulator) //capture values
                  case None =>
                    recursive(remaining, accumulator, isEarlyAbort = true) //early abort
                }
              else
                recursive(remaining, accumulator) //discard row
            case None =>
              recursive(remaining, accumulator, isEarlyAbort = true) //early abort
          }
        case None =>
          recursive(remaining, accumulator, isEarlyAbort = true) //early abort
      }
    }
  }
  Try(Source.fromFile(file)).flatMap(
    bufferedSource =>
      usingSource(bufferedSource) {
        source =>
          recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
      }
  )
}

Хотя это решение было относительно лаконичным, мне потребовалось немало времени и много проходов по рефакторингу, прежде чем я наконец смог добраться сюда.Пожалуйста, дайте мне знать, если вы видите какие-либо способы его улучшения.


ОБНОВЛЕНИЕ: я только что задал вопрос ниже как это собственный вопрос StackOverflow .И теперь у есть ответ, исправляющий ошибку , упомянутую ниже.

У меня была идея попробовать сделать это еще более универсальным, изменив параметр retainValues на transformLine с новыми обобщениямиопределение функции ниже.Тем не менее, я получаю ошибку выделения в IntelliJ «Выражение типа Some [List [String]] не соответствует ожидаемому типу Option [A]» и не смог выяснить, как изменить значение по умолчанию, чтобы ошибкауходит.

def tryProcessSource2[A <: AnyRef](
  file: File,
  parseLine: (Int, String) => Option[List[String]] =
    (index, unparsedLine) => Some(List(unparsedLine)),
  filterLine: (Int, List[String]) => Option[Boolean] =
    (index, parsedValues) => Some(true),
  transformLine: (Int, List[String]) => Option[A] =
    (index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
  ???
}

Любая помощь в том, как сделать эту работу будет принята с благодарностью.

...