Spark читает failfast csv и перечитывает снова в блоке catch - PullRequest
0 голосов
/ 03 сентября 2018

У меня следующий алгоритм: чтение из CSV-файла в отказоустойчивом режиме с указанной схемой. Если схема неверна, обработайте исключение SparkException, снова прочитав CSV (без указанной схемы).

implicit val ss: SparkSession = SparkSession.builder()/*...*/.getOrCreate()

val inputSchema = StructType( ... )
val path = "src/main/resources/test.csv"

try {
  val inputDFStructured = readCSV(path, Some(inputSchema), Map("header" -> "true", "mode" -> "FAILFAST"))

  //... number of different transformations with structuredDF

  inputDFStructured.write.parquet("...")

} catch {
  case se: SparkException => {
    println("Failed to read using specified schema: " + se.getMessage)

    val inputDFStringSchema = readCSV(path, None, Map("header" -> "true"))

    //... number of different transformations with inputDFStringSchema

    inputDFStringSchema.write.parquet("...")
  }
}

  def readCSV(path: String, schema: Option[StructType], options: Map[String, String])(implicit ss: SparkSession): DataFrame = {
       ss.read.schema(schema.orNull).options(options).csv(path)
}
  1. Этот код безопасен, учитывая ленивую оценку?

  2. Возможно ли, что я получу несколько строк, записанных в выходной путь, прежде чем он перейдет к блоку перехвата из-за выданного исключения проверки схемы (из-за ленивой оценки или чего-то в этом роде)?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...