Вот что я в итоге сделал:
Я добавил в схему столбец "_corrupt_record", например:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Затем я читаю CSV в режиме PERMISSIVE (это Spark по умолчанию):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Теперь мой фрейм данных содержит дополнительный столбец, содержащий строки с несоответствиями схемы.
Я отфильтровал строки, которые не соответствовали данным, и распечатал их:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()