Обработка несоответствий схемы в Spark - PullRequest
0 голосов
/ 14 ноября 2018

Я читаю CSV-файл, используя Spark в Scala. Схема предопределена, и я использую ее для чтения. Это пример кода:

// create the schema
val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

// Initialize Spark session
val spark: SparkSession = SparkSession.builder
    .appName("Parquet Converter")
    .getOrCreate

// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)

Из того, что я прочитал, когда читал CAV со Spark по схеме, есть 3 варианта:

  1. Установите режим на DROPMALFORMED -> это приведет к удалению строк, не соответствующих схеме
  2. Установить режим на PERMISSIVE -> это установит всю строку на нулевые значения
  3. Установить режим на FAILFAST -> это вызовет исключение при обнаружении несоответствия

Как лучше всего сочетать варианты? Мне нужно получить несоответствия в схеме, распечатать их как ошибки и игнорировать строки в моем фрейме данных. По сути, я хочу сочетание FAILFAST и DROPMALFORMED.

Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 24 декабря 2018

Вот что я в итоге сделал:
Я добавил в схему столбец "_corrupt_record", например:

val schema= StructType(Array(
    StructField("col1", IntegerType,true),    
    StructField("col2", StringType,false),
    StructField("col3", StringType,true),
    StructField("_corrupt_record", StringType, true)))

Затем я читаю CSV в режиме PERMISSIVE (это Spark по умолчанию):

val dataFrame: DataFrame = spark.read.format("csv")
                                .schema(schema)
                                .option("header", false)
                                .option("mode", "PERMISSIVE")
                                .load(inputCsvPath)

Теперь мой фрейм данных содержит дополнительный столбец, содержащий строки с несоответствиями схемы. Я отфильтровал строки, которые не соответствовали данным, и распечатал их:

val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
0 голосов
/ 14 ноября 2018

Просто используйте DROPMALFORMED и следите за журналом.Если имеются искаженные записи, они выгружаются в журнал, до предела, установленного параметром maxMalformedLogPerPartition.

spark.read.format("csv")
  .schema(schema)
  .option("header", false)
  .option("mode", "DROPMALFORMED")
  .option("maxMalformedLogPerPartition", 128)
  .load(inputCsvPath)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...