У меня следующий алгоритм: чтение из CSV-файла в отказоустойчивом режиме с указанной схемой. Если схема неверна, обработайте исключение SparkException, снова прочитав CSV (без указанной схемы).
implicit val ss: SparkSession = SparkSession.builder()/*...*/.getOrCreate()
val inputSchema = StructType( ... )
val path = "src/main/resources/test.csv"
try {
val inputDFStructured = readCSV(path, Some(inputSchema), Map("header" -> "true", "mode" -> "FAILFAST"))
//... number of different transformations with structuredDF
inputDFStructured.write.parquet("...")
} catch {
case se: SparkException => {
println("Failed to read using specified schema: " + se.getMessage)
val inputDFStringSchema = readCSV(path, None, Map("header" -> "true"))
//... number of different transformations with inputDFStringSchema
inputDFStringSchema.write.parquet("...")
}
}
def readCSV(path: String, schema: Option[StructType], options: Map[String, String])(implicit ss: SparkSession): DataFrame = {
ss.read.schema(schema.orNull).options(options).csv(path)
}
Этот код безопасен, учитывая ленивую оценку?
Возможно ли, что я получу несколько строк, записанных в выходной путь, прежде чем он перейдет к блоку перехвата из-за выданного исключения проверки схемы (из-за ленивой оценки или чего-то в этом роде)?