Spark не удалось десериализовать запись при создании набора данных - PullRequest
0 голосов
/ 17 декабря 2018

Я читаю большое количество CSV из S3 (все с префиксом ключа) и создаю строго типизированный Dataset.

val events: DataFrame = cdcFs.getStream()
events
  .withColumn("event", lit("I"))
  .withColumn("source", lit(sourceName))
  .as[TradeRecord]

, где TradeRecord - это класс дела, который можетобычно быть десериализованным в последствиях SparkSession.Однако для определенной партии запись не может быть десериализована.Вот ошибка (трассировка стека пропущена)

Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "deal")
- root class: "com.company.trades.TradeRecord"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

deal, являющаяся полем TradeRecord, которое никогда не должно быть нулевым в исходных данных (объектах S3), поэтому это не Option.

К сожалению, сообщение об ошибке не дает мне никакого представления о том, как выглядят данные CSV или даже из какого CSV-файла они поступают.Пакет состоит из сотен файлов, поэтому мне нужен способ сузить это до не более нескольких файлов, чтобы исследовать проблему.

Ответы [ 2 ]

0 голосов
/ 19 декабря 2018

Вот решение, которое я предложил (я использую Spark Structured Streaming):

val stream = spark.readStream
  .format("csv")
  .schema(schema) // a StructType defined elsewhere
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "corruptRecord")
  .load(path)

// If debugging, check for any corrupted CSVs
if (log.isDebugEnabled) { // org.apache.spark.internal.Logging trait 
  import spark.implicits._
  stream
    .filter($"corruptRecord".isNotNull)
    .withColumn("input_file", input_file_name)
    .select($"input_file", $"corruptRecord")
    .writeStream
    .format("console")
    .option("truncate", false)
    .start()
}

val events = stream
  .withColumn("event", lit("I"))
  .withColumn("source", lit(sourceName))
  .as[TradeRecord]

В основном, если уровень журнала Spark установлен на Debug или ниже, DataFrame проверяется на наличие поврежденных записей и любыхтакие записи распечатываются вместе с именами их файлов.В конце концов программа пытается преобразовать этот DataFrame в строго типизированный Dataset[TradeRecord] и завершается неудачей.

0 голосов
/ 17 декабря 2018

Как предложил от user10465355 Вы можете загрузить данные:

val events: DataFrame = ???

Фильтр

val mismatched = events.where($"deal".isNull)

Добавить имя файла

import org.apache.spark.sql.functions.input_file_name

val tagged = mismatched.withColumn("_file_name", input_file_name)

При желании можно добавить чанк, чанк и смещение:

import org.apache.spark.sql.functions.{spark_partition_id, monotonically_increasing_id, shiftLeft, shiftRight

df
  .withColumn("chunk", spark_partition_id())
  .withColumn(
    "offset",
    monotonically_increasing_id - shiftLeft(shiftRight(monotonically_increasing_id, 33), 33))
...