У меня есть ежедневный файл, который я обрабатываю, и каждый день меняется схема файла в массиве, который я хочу взорвать, и возникает ошибка, вот схема
Day 1
root
|-- CaseNumber: string (nullable = true)
|-- Interactions: struct (nullable = true)
| |-- EmailInteractions: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- BccAddresses: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- CcAddresses: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- CreatedBy: string (nullable = true)
| | | |-- CreatedOn: string (nullable = true)
| | | |-- Direction: string (nullable = true)
| | | |-- FromAddress: string (nullable = true)
| | | |-- SentOn: string (nullable = true)
| | | |-- Subject: string (nullable = true)
| | | |-- Summary: string (nullable = true)
| | | |-- ToAddresses: array (nullable = true)
| | | | |-- element: string (containsNull = true)
| | | |-- UpdatedOn: string (nullable = true)
| | | |-- id: string (nullable = true)
| |-- PhoneInteractions: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- WebInteractions: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- EntityAction: string (nullable = true)
|-- EventDateTime: string (nullable = true)
Day 2
root
|-- CaseNumber: string (nullable = true)
|-- Interactions: struct (nullable = true)
| |-- PhoneInteractions: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CreatedBy: string (nullable = true)
| | | |-- CreatedOn: string (nullable = true)
| | | |-- Direction: string (nullable = true)
|-- EntityAction: string (nullable = true)
|-- EventDateTime: string (nullable = true)
Я хочу взорвать EmailInteractions, PhoneInteractions, WebInteractions, вот код, который я использую для взлома EmailInteractions
val dl = spark.read.format("com.databricks.spark.avro").load("adl://powerbiconnect.azuredatalakestore.net/SD_Case/sdeventhubspace/sdeventhub/1_2020_01_20_*_*_*.avro")
val dl1=dl.select($"body".cast("string")).map(_.toString())
val dl2=spark.read.json(dl1)
val dl3=dl2.select($"Content.CaseNumber",$"content.Interactions",$"RawProperties.UserProperties.EntityAction",$"RawProperties.UserProperties.EventDateTime")
val windowSpec = Window.partitionBy("caseNumber").orderBy(col("EventDateTime").desc)
val dl4=dl3.withColumn("rnk",row_number().over(windowSpec)).filter( $"rnk"===1)
val Contact=dl4.select( $"CaseNumber" as "CaseNumber",
$"Interactions.EmailInteractions" as "EmailInteractions")
val kk=Contact.select( $"CaseNumber", explode( $"EmailInteractions" ).as( "EmailInteractionsFlat" )
// )
val dl5=dl4.select($"CaseNumber",$"Interactions.EmailInteractions.CreatedOn",$"Interactions.EmailInteractions.Direction")
dl4.printSchema
display(kk)
, поскольку схема согласована, я хочу написать регистр, например, "explode Email Int if it has objects under it if not pass it, explode Web int if it has objects under it if not pass it, same goes for Phone int"
Я новичок в кодирование, любая помощь будет оценена. спасибо