Как проверить мои данные с помощью jsonSchema scala - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть фрейм данных, который выглядит так

+--------------------+----------------+------+------+
|                 id |       migration|number|string|
+--------------------+----------------+------+------+
|[5e5db036e0403b1a.  |mig             |     1| str  |
+--------------------+----------------+------+------+

, и у меня есть jsonSchema:

{
"title": "Section",
"type": "object",
"additionalProperties": false,
"required": ["migration", "id"],
"properties": {
  "migration": {
    "type": "string",
    "additionalProperties": false
  },
  "string": {
    "type": "string"
  },
  "number": {
    "type": "number",
     "min": 0
  }
 }
}

Я хотел бы проверить схему моего фрейма данных с моей jsonSchema. Спасибо

1 Ответ

1 голос
/ 29 апреля 2020

Пожалуйста, найдите комментарии к встроенному коду для объяснения

val newSchema : StructType = DataType.fromJson("""{
                                        |  "type": "struct",
                                        |  "fields": [
                                        |    {
                                        |      "name": "id",
                                        |      "type": "string",
                                        |      "nullable": true,
                                        |      "metadata": {}
                                        |    },
                                        |    {
                                        |      "name": "migration",
                                        |      "type": "string",
                                        |      "nullable": true,
                                        |      "metadata": {}
                                        |    },
                                        |    {
                                        |      "name": "number",
                                        |      "type": "integer",
                                        |      "nullable": false,
                                        |      "metadata": {}
                                        |    },
                                        |    {
                                        |      "name": "string",
                                        |      "type": "string",
                                        |      "nullable": true,
                                        |      "metadata": {}
                                        |    }
                                        |  ]
                                        |}""".stripMargin).asInstanceOf[StructType] // Load you schema from JSON string


//    println(newSchema)
    val spark = Constant.getSparkSess // Create SparkSession object 

    //Correct data  
    val correctData: RDD[Row]  = spark.sparkContext.parallelize(Seq(Row("5e5db036e0403b1a.","mig",1,"str")))
    val dfNew = spark.createDataFrame(correctData, newSchema) // validating the data

    dfNew.show()

    //InCorrect data  
    val inCorrectData: RDD[Row]  = spark.sparkContext.parallelize(Seq(Row("5e5db036e0403b1a.",1,1,"str")))
    val dfInvalid = spark.createDataFrame(inCorrectData, newSchema) // validating the data which will throw RuntimeException: java.lang.Integer is not a valid external type for schema of string 
    dfInvalid.show()
    val res = spark.sql("") // Load the SQL dataframe
    val diffColumn : Seq[StructField] = res.schema.diff(newSchema) // compare SQL dataframe with JSON schema
    diffColumn.foreach(_.name) // Print the Diff columns
...