Я работаю над преобразованием JSON, который имеет вложенную пару ключ-значение, чтобы автоматически создавать столбцы для ключей и заполнять строки для значений.Я не хочу создавать схему, так как количество столбцов (ключей) будет отличаться для каждого файла.Я использую Spark версии 2.3 и Scala версии 2.11.8.Я не являюсь экспертом по Scala, и я только начал изучать Scala, поэтому оцените ваши замечания, чтобы решить эту проблему.
Вот пример формата JSON
{"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":[{"Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982"},{"Name":"id"},{"Name":"opt_newsletter_email","Value":"boutmathieu@me.com"},{"Name":"parm1","Value":"secure.snnow.ca/orders/summary"},{"Name":"parm2","Value":"fromET"},{"Name":"parm3","Value":"implied"},{"Name":"parm4"},{"Name":"subscribed","Value":"True"},{"Name":"timestamp","Value":"8/6/2019 4:59:00 PM"},{"Name":"list_id","Value":"6"},{"Name":"name","Value":"Event Alerts"},{"Name":"email","Value":"boutmathieu@me.com"},{"Name":"newsletterID","Value":"sports:snnow:event"},{"Name":"subscribeFormIdOrURL"},{"Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"}]}
{"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":[{"Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982"},{"Name":"id"},{"Name":"opt_newsletter_email","Value":"boutmathieu@me.com"},{"Name":"parm1","Value":"secure.snnow.ca/orders/summary"},{"Name":"parm2","Value":"fromET"},{"Name":"parm3","Value":"implied"},{"Name":"parm4"},{"Name":"subscribed","Value":"True"},{"Name":"timestamp","Value":"8/6/2019 4:59:00 PM"},{"Name":"list_id","Value":"7"},{"Name":"name","Value":"Partner & Sponsored Offers"},{"Name":"email","Value":"boutmathieu@me.com"},{"Name":"newsletterID","Value":"sports:snnow:affiliate"},{"Name":"subscribeFormIdOrURL"},{"Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"}]}
Ожидаемый результат введитеописание изображения здесь
Это мой код.
val newDF = spark.read.json("408d392-8c50-425a-a799-355f1783e0be-c000.json")
scala> newDF.printSchema
root
|-- OverallStatus: string (nullable = true)
|-- RequestID: string (nullable = true)
|-- ele: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
| | |-- Value: string (nullable = true)
val jsonDF = newDF.withColumn("colNames", explode($"ele")).select($"RequestID", ($"ColNames"))
scala> jsonDF.printSchema
root
|-- RequestID: string (nullable = true)
|-- setting: struct (nullable = true)
| |-- Name: string (nullable = true)
| |-- Value: string (nullable = true)
val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")
---------------------------------------------------------------------------------------
I am getting this error while creating the finalDF
<console>:39: error: overloaded method value agg with alternatives:
(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame <and>
(exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame <and>
(exprs: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame <and>
(aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame
cannot be applied to (String)
val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")
Любая помощь будет принята с благодарностью