Как получить потоковый Iterator [Node] из большого XML-документа? - PullRequest
9 голосов
/ 15 декабря 2011

Мне нужно обработать XML-документы, которые состоят из очень большого числа независимых записей, например,

<employees>
    <employee>
         <firstName>Kermit</firstName>
         <lastName>Frog</lastName>
         <role>Singer</role>
    </employee>
    <employee>
         <firstName>Oscar</firstName>
         <lastName>Grouch</lastName>
         <role>Garbageman</role>
    </employee>
    ...
</employees>

В некоторых случаях это просто большие файлы, но в других они могут поступать из потокового источника.

Я не могу просто scala.xml.XmlLoader.load (), потому что я не хочу держать весь документ в памяти (или ждать закрытия входного потока), когда мне нужно толькоработать с одной записью за раз.Я знаю, что могу использовать XmlEventReader для потоковой передачи ввода в виде последовательности XmlEvents.Однако с ними гораздо менее удобно работать, чем с scala.xml.Node.

Так что я бы хотел как-нибудь извлечь из этого ленивый Iterator [Node], чтобы работать с каждой отдельной записью, используяудобный синтаксис Scala, сохраняя контроль над использованием памяти.

Чтобы сделать это самостоятельно, я мог бы начать с XmlEventReader, создать буфер событий между каждым совпадающим начальным и конечным тегом, а затем построить дерево узлов изтот.Но есть ли более простой способ, который я упустил?Спасибо за любые идеи!

Ответы [ 2 ]

8 голосов
/ 16 декабря 2011

Вы можете использовать основной синтаксический анализатор, используемый XMLEventReader - ConstructingParser, и обрабатывать узлы ваших сотрудников ниже верхнего уровня с помощью обратного вызова.Вам просто нужно быть осторожным, отбрасывая данные, как только они будут обработаны:

import scala.xml._

def processSource[T](input: Source)(f: NodeSeq => T) {
  new scala.xml.parsing.ConstructingParser(input, false) {
    nextch // initialize per documentation
    document // trigger parsing by requesting document

    var depth = 0 // track depth

    override def elemStart(pos: Int, pre: String, label: String,
        attrs: MetaData, scope: NamespaceBinding) {
      super.elemStart(pos, pre, label, attrs, scope)
      depth += 1
    }
    override def elemEnd(pos: Int, pre: String, label: String) {
      depth -= 1
      super.elemEnd(pos, pre, label)
    }
    override def elem(pos: Int, pre: String, label: String, attrs: MetaData,
        pscope: NamespaceBinding, nodes: NodeSeq): NodeSeq = {
      val node = super.elem(pos, pre, label, attrs, pscope, nodes)
      depth match {
        case 1 => <dummy/> // dummy final roll up
        case 2 => f(node); NodeSeq.Empty // process and discard employee nodes
        case _ => node // roll up other nodes
      }
    }
  }
}

Затем вы используете это для обработки каждого узла на втором уровне в постоянной памяти (при условии, что узлы на втором уровне не являютсяполучение произвольного числа детей):

processSource(src){ node =>
  // process here
  println(node)
}

Преимущество по сравнению с XMLEventReader заключается в том, что вы не используете два потока.Также вам не нужно анализировать узел дважды по сравнению с предложенным решением.Недостатком является то, что это зависит от внутренней работы ConstructingParser.

5 голосов
/ 16 декабря 2011

Чтобы перейти от решения генератора huynhjl к TraversableOnce[Node], используйте этот трюк :

def generatorToTraversable[T](func: (T => Unit) => Unit) = 
  new Traversable[T] {
    def foreach[X](f: T => X) {
      func(f(_))
    }
  }

def firstLevelNodes(input: Source): TraversableOnce[Node] =
  generatorToTraversable(processSource(input))

Результат generatorToTraversable не может быть пройден более одного раза (даже если новый ConstructingParserсоздается при каждом вызове foreach), поскольку входной поток является источником, который является итератором.Однако мы не можем переопределить Traversable.isTraversableAgain, потому что он окончательный.

На самом деле мы хотели бы применить это, просто возвращая Iterator.Тем не менее, и Traversable.toIterator, и Traversable.view.toIterator создают промежуточный поток, который будет кэшировать все записи (игнорируя всю цель этого упражнения).Ну что ж;Я просто позволю потоку генерировать исключение, если к нему обращаются дважды.

Также обратите внимание, что все это не является потокобезопасным.

Этот код работает отлично, и я считаю, что общее решение длябыть ленивым и не кэшировать (следовательно, иметь постоянную память), хотя я еще не пробовал его на большом входе.

...