Насколько я знаю .... К сожалению, до сегодняшнего дня он не был доступен в xml пакете spark в декларативном виде ... так, как вы ожидаете ...
Json работал, так как FailureSafeParser
был реализован, как показано ниже ... в DataFrameReader
/**
* Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
* text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
*
* Unless the schema is specified using `schema` function, this function goes through the
* input once to determine the input schema.
*
* @param jsonDataset input Dataset with one JSON object per record
* @since 2.2.0
*/
def json(jsonDataset: Dataset[String]): DataFrame = {
val parsedOptions = new JSONOptions(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val schema = userSpecifiedSchema.getOrElse {
TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
}
ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions { iter =>
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
}
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
}
вы можете реализовать функцию программным способом.
прочитайте все файлы в папке, используя sc.textFile
.файл foreach с помощью парсера xml разбирает записи.
Если допустимо перенаправление на другой путь.
Если он недействителен, введите неправильный путь записи.