Я создаю новый фрейм данных (заданный классом дел) с входным фреймом данных, который может иметь меньшее / другое количество столбцов, чем существующий.Я пытаюсь использовать класс дел для установки значения несуществующего как нулевого.
Я использую этот класс дел для управления созданием нового кадра данных.
Входной кадр данных (Входящий DD) может не иметь поля всех переменных, которые установлены как пустое значение выше.
case class existingSchema(source_key: Int
, sequence_number: Int
, subscriber_id: String
, subscriber_ssn: String
, last_name: String
, first_name: String
, variable1: String = null
, variable2: String = null
, variable3: String = null
, variable4: String = null
, variable5: String = null
, source_date: Date
, load_date: Date
, file_name_String: String)
val incomingDf = spark.table("raw.incoming")
val formattedDf = incomingDf.as[existingSchema].toDF()
Это выдает ошибку во время компиляции.
Ожидается, что новая схема formattedDf будет иметьта же схема, что и в случае с существующим классом существующей Схемы.
incomingDf.printSchema
root
|-- source_key: integer (nullable = true)
|-- sequence_number: integer (nullable = true)
|-- subscriber_id: string (nullable = true)
|-- subscriber_ssn: string (nullable = true)
|-- last_name: string (nullable = true)
|-- first_name: string (nullable = true)
|-- variable1: string (nullable=true)
|-- variable3: string (nullable = true)
|-- source_date: date (nullable = true)
|-- load_date: date (nullable = true)
|-- file_name_string: string (nullable = true)
Ошибка компиляции:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val formattedDf = incomingDf.as[existingSchema].toDF()
^
one error found
FAILED
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':compileScala'.
> Compilation failed
Обновление: я добавил строку кода:
import incomingDf.sparkSession.implicits._
икомпиляция в порядке.
Теперь я получаю следующую ошибку во время выполнения:
19/04/17 14:37:56 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`variable2`' given input columns: [variable1, variable3, sequence_number, last_name, first_name, file_name_string, subscriber_id, load_date, source_key];
org.apache.spark.sql.AnalysisException: cannot resolve '`variable2`' given input columns: [variable1, variable3, sequence_number, last_name, first_name, file_name_string, subscriber_id, load_date, source_key];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)