Использование класса case для добавления неизвестных столбцов как нулевых - PullRequest
2 голосов
/ 17 апреля 2019

Я создаю новый фрейм данных (заданный классом дел) с входным фреймом данных, который может иметь меньшее / другое количество столбцов, чем существующий.Я пытаюсь использовать класс дел для установки значения несуществующего как нулевого.

Я использую этот класс дел для управления созданием нового кадра данных.

Входной кадр данных (Входящий DD) может не иметь поля всех переменных, которые установлены как пустое значение выше.

case class existingSchema(source_key: Int
                        , sequence_number: Int
                        , subscriber_id: String
                        , subscriber_ssn: String
                        , last_name: String
                        , first_name: String
                        , variable1: String = null
                        , variable2: String = null
                        , variable3: String = null
                        , variable4: String = null
                        , variable5: String = null
                        , source_date: Date
                        , load_date: Date
                        , file_name_String: String)

val incomingDf = spark.table("raw.incoming")

val formattedDf = incomingDf.as[existingSchema].toDF()

Это выдает ошибку во время компиляции.

Ожидается, что новая схема formattedDf будет иметьта же схема, что и в случае с существующим классом существующей Схемы.

incomingDf.printSchema
root
 |-- source_key: integer (nullable = true)
 |-- sequence_number: integer (nullable = true)
 |-- subscriber_id: string (nullable = true)
 |-- subscriber_ssn: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- variable1: string (nullable=true)
 |-- variable3: string (nullable = true)
 |-- source_date: date (nullable = true)
 |-- load_date: date (nullable = true)
 |-- file_name_string: string (nullable = true)

Ошибка компиляции:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val formattedDf = incomingDf.as[existingSchema].toDF()                                                                                                                                                                                                                                                                               
                                                     ^                                                                                                                                                                                                                                                                                                 
one error found                                                                                                                                                                                                                                                                                                                                        
 FAILED                                                                                                                                                                                                                                                                                                                                                

FAILURE: Build failed with an exception.                                                                                                                                                                                                                                                                                                               

* What went wrong:                                                                                                                                                                                                                                                                                                                                     
Execution failed for task ':compileScala'.                                                                                                                                                                                                                                                                                                             
> Compilation failed 

Обновление: я добавил строку кода:

import incomingDf.sparkSession.implicits._

икомпиляция в порядке.

Теперь я получаю следующую ошибку во время выполнения:

19/04/17 14:37:56 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`variable2`' given input columns: [variable1, variable3, sequence_number, last_name, first_name, file_name_string, subscriber_id, load_date, source_key];
org.apache.spark.sql.AnalysisException: cannot resolve '`variable2`' given input columns: [variable1, variable3, sequence_number, last_name, first_name, file_name_string, subscriber_id, load_date, source_key];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)

Ответы [ 2 ]

3 голосов
/ 17 апреля 2019

Вы, вероятно, хотите специально определить свою схему DF. Например:

import org.apache.spark.sql.types._

val newSchema: StructType = StructType(Array(
  StructField("nested_array", ArrayType(ArrayType(StringType)), true),
  StructField("numbers", IntegerType, true),
  StructField("text", StringType, true)
))

// Given a DataFrame df...
val combinedSchema = StructType(df.schema ++ newSchema)
val resultRDD = ... // here, process df to add rows or whatever and get the result as an RDD
                    // you can get an RDD as simply as df.rdd
val outDf = sparkSession.createDataFrame(resultRDD, combinedSchema)

Третий член аргумента [StructField][1] гарантирует, что вновь созданные поля обнуляются. По умолчанию установлено значение true, поэтому вам не нужно добавлять это, но я включил его для ясности, поскольку вся цель использования этого метода - создать специально обнуляемое поле.

1 голос
/ 17 апреля 2019

В существующей схеме отсутствуют некоторые строковые поля класса case.Вам просто нужно добавить их явно:

val formattedDf = Seq("variable2", "variable4", "variable5")
  .foldLeft(incomingDf)((df, col) => {
    df.withColumn(col, lit(null.asInstanceOf[String]))
  }).as[existingSchema].toDF()

Более общим решением будет вывод недостающих полей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...