Я читаю большое количество CSV из S3 (все с префиксом ключа) и создаю строго типизированный Dataset
.
val events: DataFrame = cdcFs.getStream()
events
.withColumn("event", lit("I"))
.withColumn("source", lit(sourceName))
.as[TradeRecord]
, где TradeRecord
- это класс дела, который можетобычно быть десериализованным в последствиях SparkSession.Однако для определенной партии запись не может быть десериализована.Вот ошибка (трассировка стека пропущена)
Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "deal")
- root class: "com.company.trades.TradeRecord"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).
deal
, являющаяся полем TradeRecord
, которое никогда не должно быть нулевым в исходных данных (объектах S3), поэтому это не Option
.
К сожалению, сообщение об ошибке не дает мне никакого представления о том, как выглядят данные CSV или даже из какого CSV-файла они поступают.Пакет состоит из сотен файлов, поэтому мне нужен способ сузить это до не более нескольких файлов, чтобы исследовать проблему.