Может ли alpakka-xml обрабатывать несколько XML-файлов? - PullRequest
0 голосов
/ 21 декабря 2018

У меня возникли проблемы при использовании Alpakka XmlParsing Flow

val files: List[String] = ... // file paths locally on disk

// simple source emitting the contents of 2 XML files
val documentSource = FileIO.fromPath(Paths.get(files.head))
  .concat(FileIO.fromPath(Paths.get(files(1))))

val contentFlow: Flow[ParseEvent, CustomContent, Notused] =
  Flow.fromGraph(new ContentProcessorFlow)

documentSource
  .via(XmlParsing.parser)
  .via(contentFlow)
  .to(Sink.foreach(println))
  .run

. Когда он запускается, график печатает элементы, испускаемые contentFlow, которые являются правильными и соответствуют ожидаемым значениям для первого файла.После этого выдается это исключение:

[ERROR] [12/20/2018 16:32:23.648] [Sync-akka.actor.default-dispatcher-2] [akka://Sync/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [akka.stream.alpakka.xml.impl.StreamingXmlParser@36b80955]: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
 at [row,col {unknown-source}]: [44,17]
com.fasterxml.aalto.WFCException: Illegal processing instruction target: 'xml' (case insensitive) is reserved by the xml specification
 at [row,col {unknown-source}]: [44,17]
at com.fasterxml.aalto.in.XmlScanner.reportInputProblem(XmlScanner.java:1333)
at com.fasterxml.aalto.async.AsyncByteScanner.checkPITargetName(AsyncByteScanner.java:665)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.handlePI(AsyncByteArrayScanner.java:2091)
at com.fasterxml.aalto.async.AsyncByteArrayScanner.nextFromProlog(AsyncByteArrayScanner.java:1064)
at com.fasterxml.aalto.stax.StreamReaderImpl.next(StreamReaderImpl.java:802)
at akka.stream.alpakka.xml.impl.StreamingXmlParser$$anon$1.advanceParser(StreamingXmlParser.scala:55)

Я понимаю основы происходящего здесь - парсер жалуется на ByteString директивы <?xml version="1.0" encoding="UTF-8"?> в верхней части второго файла, но я 'Я недостаточно знаком с потоками, чтобы знать, что с этим делать.Если я удаляю директиву, я получаю немного другое исключение из-за наличия двух корневых элементов.

Моя цель - построить график, который читает файлы из местоположения и выдает CustomContent для дальнейшей обработки.Как я могу переработать это, чтобы каждый файл обрабатывался как отдельная единица ввода?

1 Ответ

0 голосов
/ 21 декабря 2018

Обрабатывайте файлы как отдельные Source с, затем объединяйте их в один Source:

val files: List[String] = ???

val sources: List[Source[CustomContent, Future[IOResult]]] =
  files
    .map { f =>
      FileIO.fromPath(Paths.get(f))
        .via(XMLParsing.parser)
        .via(contentFlow)
    }

val mergedSource: Source[CustomContent, NotUsed] =
  Source(sources).flatMapConcat(identity)

mergedSource.runForeach(println)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...