Scala: функционально перебирать файлы CSV? - PullRequest
6 голосов
/ 12 сентября 2011

У меня есть CSV-файлы с комментариями, которые дают имена столбцов, где столбцы меняются по всему файлу:

#c1,c2,c3
a,b,c
d,e,f
#c4,c5
g,h
i,j

Я хочу предоставить способ перебирать (только) строки данных файла в виде сопоставления имен столбцов со значениями (все строки). Таким образом, выше будет:

Map(c1 -> a, c2 -> b, c3 -> c)
Map(c1 -> d, c2 -> e, c3 -> f)
Map(c4 -> g, c5 -> h)
Map(c4 -> i, c5 -> j)

Файлы очень большие, поэтому чтение всего в память не вариант. Прямо сейчас у меня есть класс Iterator, который поддерживает некрасивое состояние между hasNext() и next(); Я также предоставляю средства доступа для текущего номера строки и фактической последней строки и прочитанного комментария (в случае, если потребители заботятся о порядке полей). Я хотел бы попытаться сделать вещи более функциональным способом.

Моей первой идеей было понимание: я могу перебирать строки файла, пропуская строки комментариев с помощью предложения фильтра. Я могу yield кортеж, содержащий карту, номер строки и т. Д. Проблема в том, что мне нужно запомнить последние имена столбцов, чтобы я мог создавать Карты из них. По понятным причинам для циклов старайтесь препятствовать сохранению состояния, просто позволяя вам установить новые val s. Из этого вопроса я узнал, что я могу обновить переменные-члены в блоке yield, но именно тогда я не хочу обновить их в моем случае!

Я мог бы вызвать функцию в предложении итерации, которая обновляет состояние, но это кажется грязным. Итак, что является лучшим способом сделать это в функциональном стиле? Злоупотребление для понимания? Взломать scanLeft ? Использовать библиотеку? Возьмите парсер комбинатор большие пушки? Или функциональный стиль просто не подходит для этой проблемы?

Ответы [ 7 ]

5 голосов
/ 14 сентября 2011

Государственная Монада FTW!

На самом деле, я сосу из государственной монады. Я чертовски долго писал об этом, и у меня было сильное чувство, что это можно сделать намного лучше. В частности, мне кажется, что traverse - это путь, но ...

// Get Scalaz on the job
import scalaz._
import Scalaz._

// Some type aliases to make stuff clearer
type Input         = Stream[String]
type Header        = String
type InternalState = (Input, Header)
type Output        = Option[(Header, String)]
type MyState       = State[InternalState, Output]

// Detect headers
def isHeader(line: String) = line(0) == '#'

// From a state, produce an output
def makeLine: (InternalState => Output) = {
    case (head #:: _, _) if isHeader(head) => None
    case (head #:: _, header)              => Some(header -> head)
    case _                                 => None
}

// From a state, produce the next state
def nextLine: (InternalState => InternalState) = {
    case (head #:: tail, _) if isHeader(head) => tail -> head
    case (_ #:: tail, header)                 => tail -> header
    case _                                    => Stream.empty -> ""
}

// My state is defined by the functions producing the next state
// and the output
val myState: MyState = state(s => nextLine(s) -> makeLine(s))    

// Some input to test it. I'm trimming it to avoid problems on REPL
val input = """#c1,c2,c3
a,b,c
d,e,f
#c4,c5
g,h
i,j""".lines.map(_.trim).toStream

// My State/Output Stream -- def to avoid keeping a reference to the head
def stateOutputStream = Stream.iterate(myState(input, "")){ 
        case (s, _) => myState(s) 
    } takeWhile { case ((stream, _), output) => stream.nonEmpty || output.nonEmpty }

// My Output Stream -- flatMap gets rid of the None from the headers
def outputStream = stateOutputStream flatMap { case (_, output) => output }

// Now just get the map
def outputToMap: (Header, String) => Map[String, String] = {
    case (header, line) =>
        val keys = header substring 1 split ","
        val values = line split ","
        keys zip values toMap
}

// And this is the result -- note that I'm still avoiding "val" so memory
// won't leak
def result = outputStream map outputToMap.tupled
3 голосов
/ 14 сентября 2011

Вот один из способов сделать это с помощью Iteratees.Поток представлен как функция от Iteratee до Iteratee, поэтому он никогда не реализуется в памяти.Я использую монаду State для отслеживания последнего встреченного заголовка.

import scalaz._
import Scalaz._
import IterV._

type Header = List[String]
type MyState[A] = State[Header, A]
type Out = Map[String, String]

// Detect headers
def isHeader(line: String) = line(0) == '#'

type Enumeratee[A, B, C] =
  IterV[B, C] => Iteratee[MyState, A, IterV[B, C]]

// Enumerate a list. Just for demonstration.
def enumerateM[M[_]: Monad, E, A]:
  (List[E], Iteratee[M, E, A]) => Iteratee[M, E, A] = {
    case (Nil, i) => i
    case (x :: xs, Iteratee(m)) => Iteratee(for {
      v <- m
      o <- v match {
        case d@DoneM(_, _) => d.pure[M]
        case ContM(k) => enumerateM.apply(xs, k(El(x))).value
      }
    } yield o)
  }

def stateTrans[A]: Enumeratee[String, Map[String, String], A] =
  i => Iteratee(i.fold(
         done = (_, _) => DoneM(i, Empty.apply).pure[MyState],
         cont = k => ContM((x: Input[String]) => x match {
           case El(e) => Iteratee[MyState, String, IterV[Out, A]](for {
             h <- init
             o <- if (isHeader(e))
                    put(e substring 1 split "," toList) map (_ => Empty[Out])
                  else El((h zip (e split ",")).toMap).pure[MyState]
             v <- stateTrans(k(o)).value
           } yield v)
           case Empty() => stateTrans(k(Empty.apply))
           case EOF() => stateTrans(k(EOF.apply))
         }).pure[MyState]
       ))

Давайте проверим это и возьмем верхнюю часть выходного потока:

scala> (enumerateM[MyState, String, IterV[Out, Option[Out]]].apply(
     | List("#c1,c2,c3","a,b,c","d,e,f"), stateTrans(head)).value ! List())
     | match { case DoneM(a, _) => a match { case Done(b, _) => b } }
res0: Option[Out] = Some(Map(c1 -> a, c2 -> b, c3 -> c))

Это может быть намного лучшепутем абстрагирования некоторых из этих вещей в вспомогательные функции.

2 голосов
/ 15 сентября 2011

Ну, вот Python ...

from collections import namedtuple

def read_shifty_csv(csv_file):
    cols = None
    for line in csv_file:
        line = line.strip()
        if line.startswith('#'):
            cols = namedtuple('cols', line[1:].split(','))
        else:
            yield cols(*line.split(','))._asdict()

Бросьте вызов _asdict (), если вы предпочитаете работать с кортежем, а не с отображением (dict).Только материализует строку за раз в памяти.

Редактируйте , чтобы попытаться быть немного более функциональным:

from collections import namedtuple
from itertools import imap

def read_shifty_csv(csv_file):
    cols = None
    for line in imap(str.strip, csv_file):
        if line.startswith('#'):
            cols = namedtuple('cols', line[1:].split(','))
        else:
            yield cols(*line.split(','))._asdict()

Просто отбросьте злое переназначение строки = строки.полоса ()

2 голосов
/ 12 сентября 2011

Возможно, будет более элегантно, но вы получите упражнение:

  def read(lines: Iterator[String], currentHeadings: Option[Seq[String]] = None): Stream[Option[Map[String, String]]] = 
    if (lines.hasNext) {
      val l = lines.next
      if (l.startsWith("#"))
        Stream.cons(
          None,
          read(lines, Some(l.tail.split(","))))
      else
        Stream.cons(
          currentHeadings.map(_.zip(l.split(",")).toMap),
          read(lines, currentHeadings))
    } else Stream.cons(None, Stream.Empty)

  def main(args: Array[String]): Unit = {
    val lines = scala.io.Source.fromFile("data.csv").getLines
    println(read(lines).flatten.toList)
  }

Печать:

List(Map(c1 -> a, c2 -> b, c3 -> c), Map(c1 -> d, c2 -> e, c3 -> f), Map(c4 -> g, c5 -> h), Map(c4 -> i, c5 -> j))
2 голосов
/ 12 сентября 2011

Вот возможное решение:

Сначала взгляните на ответ на Разделите список по каждому элементу, удовлетворяющему предикату (Scala) , что даст вам функцию groupPrefix. Вы получаете метод groupPrefix, который разделяет список на список, разделяет вхождения, когда элемент удовлетворяет заданному предикату. Таким образом, вы разделяете свой список, начиная с каждой строки комментария (определение столбцов), и впоследствии содержащий соответствующие данные

Эта процедура затем преобразует один из подсписков (начиная с имен столбцов) в списке соответствующей карты.

import scala.collection.immutable.ListMap 
  // to keep the order of the columns. If not needed, just use Map
def toNamedFields(lines: List[String]) : List[Map[String, String]] = {
  val columns = lines.head.tail.split(",").toList // tail to discard the #
  lines.tail.map{line => ListMap(columns.zip(line.split(",")): _*)}
}

При этом вы разбиваете свои линии, получаете карты в каждой группе, получаете список из списка карт, который вы превращаете в один список со сглаживанием

groupPrefix(lines){_.startsWith("#")}.map(toNamedFields).flatten
1 голос
/ 16 сентября 2011

Вдохновленный доблестными усилиями @ schmichael's по созданию функционального решения Python, вот моя попытка слишком далеко зайти.Я не утверждаю, что он обслуживаемый, эффективный, примерный или пригодный для использования, но он функциональный:

from itertools import imap, groupby, izip, chain
from collections import deque
from operator import itemgetter, methodcaller
from functools import partial

def shifty_csv_dicts(lines):
    last = lambda seq: deque(seq, maxlen=1).pop()
    parse_header = lambda header: header[1:-1].split(',')
    parse_row = lambda row: row.rstrip('\n').split(',')
    mkdict = lambda keys, vals: dict(izip(keys,vals))
    headers_then_rows = imap(itemgetter(1), groupby(lines, methodcaller('startswith', '#')))
    return chain.from_iterable(imap(partial(mkdict, parse_header(last(headers))), imap(parse_row, next(headers_then_rows))) for headers in headers_then_rows)

Хорошо, давайте распакуем это.

Основная идея заключается в том, чтобы (ab) использоватьitertools.groupby для распознавания изменений из заголовков в строки данных.Мы используем семантику оценки аргумента для управления порядком операций.

Сначала мы сообщаем groupby сгруппировать строки по тому, начинаются ли они с '#':

methodcaller('startswith', '#')

создает функцию, которая принимает строку и вызывает line.startswith('#') (ееэквивалентно стилистически предпочтительному, но менее эффективному lambda line: line.startswith('#')).

Итак, groupby принимает входящую итерируемую величину lines и чередует возврат итерируемой строки заголовка (обычно только один заголовок) иповторяемость строк данных.Он на самом деле возвращает кортеж (group_val, group_iter), где в этом случае group_val - это bool, указывающий, является ли это заголовком.Итак, мы делаем эквивалент (group_val, group_iter)[1] для всех кортежей, чтобы выбрать итераторы: itemgetter(1) - это просто функция, которая запускает «[1]» для всего, что вы ей даете (снова эквивалентно, но более эффективно, чем * 1026)*).Поэтому мы используем imap, чтобы запустить нашу itemgetter функцию для каждого кортежа, возвращенного groupby, чтобы выбрать итераторы заголовка / данных:

imap(itemgetter(1), groupby(lines, methodcaller('startswith', '#')))

Сначала мы вычислим это выражение и дадим ему имя, потому чтомы будем использовать его дважды позже, сначала для заголовков, затем для данных.Внешний вызов:

chain.from_iterable(... for headers in headers_then_rows)

проходит через итераторы, возвращаемые из groupby.Мы ведем себя хитро и вызываем значение headers, потому что какой-то другой код внутри ... отбирает rows, когда мы не ищем, продвигая итератор groupby в процессе.Это выражение внешнего генератора будет всегда создавать только заголовки (помните, они изменяют: заголовки, данные, заголовки, данные ...).Хитрость заключается в том, чтобы убедиться, что заголовки используются перед строками, потому что они оба используют один и тот же базовый итератор.chain.from_iterable просто объединяет результаты всех итераторов строк данных в один итератор, чтобы вернуть их все.

Так что же мы соединяем?Что ж, нам нужно взять (последний) заголовок, сжать его с каждой строкой значений и сделать из этого подсказки.Это:

last = lambda seq: deque(seq, maxlen=1).pop()

- несколько грязный, но эффективный хак для получения последнего элемента от итератора, в нашем случае это строка заголовка.Затем мы анализируем заголовок, обрезая начальный # и завершающий символ новой строки, и разделяем на ,, чтобы получить список имен столбцов:

parse_header = lambda header: header[1:-1].split(',')

Но мы хотим сделать это только один раз для каждогоИтератор строк, потому что он исчерпывает наш итератор заголовков (и мы бы не хотели копировать его в какое-то изменяемое состояние, не так ли?).Мы также должны убедиться, что итератор заголовков используется перед строками.Решение состоит в том, чтобы создать частично примененную функцию, оценивая и фиксируя заголовки в качестве первого параметра, и принимая строку в качестве второго параметра:

partial(mkdict, parse_header(last(headers)))

Функция mkdict использует имена столбцов в качестве ключей и строкиданные как значения, чтобы сделать диктовку:

mkdict = lambda keys, vals: dict(izip(keys,vals))

Это дает нам функцию, которая замораживает первый параметр (keys) и позволяет просто передать второй параметр (vals): именно то, что нам нужнодля создания набора диктов с одинаковыми ключами и разными значениями.

Чтобы использовать его, мы анализируем каждую строку так, как вы ожидаете:

parse_row = lambda row: row.rstrip('\n').split(',')

, напоминая, что next(headers_then_rows) вернетитератор строк данных из groupby (поскольку мы уже использовали итератор заголовков):

imap(parse_row, next(headers_then_rows))

Наконец, мы отображаем нашу частично примененную функцию dict-maker на проанализированные строки:

imap(partial(...), imap(parse_row, next(headers_then_rows)))

И все они сшиты chain.from_iterable, чтобы создать один большой, счастливый, функциональный поток изменчивых CSV-диктов.

Для справки, это, вероятно, можно упростить, и я все равно буду делать вещи @sПуть Чмайкла.Но я понял, как это понять, и попробую применить эти идеи в решении Scala.

0 голосов
/ 12 сентября 2011

РЕДАКТИРОВАТЬ: поцарапать это, я не думаю, что вам нужны монады

...