У меня есть Spark DataFrame, и я хочу создать карту и сохранить значения как Map [String, Map [String, String]]. Я не получаю идеи сделать это, любая помощь будет оценена.
Ниже представлен формат ввода и вывода:
Вход:
+-----------------+------------+---+--------------------------------+
|relation |obj_instance|obj|map_value |
+-----------------+------------+---+--------------------------------+
|Start~>HInfo~>Mnt|Mnt |Mnt|[Model -> 2000, Version -> 1.0] |
|Start~>HInfo~>Cbl|Cbl-3 |Cbl|[VSData -> XYZVN, Name -> Smart]|
+-----------------+------------+---+--------------------------------+
Выход:
Map(relation -> Start~>HInfo~>Mnt, obj_instance -> Mnt, obj -> Mnt, Mnt -> Map(Model -> 2000, Version -> 1.0))
Map(relation -> Start~>HInfo~>Cbl, obj_instance -> Cbl-3, obj -> Cbl, Cbl -> Map(VSData -> XYZVN, Name -> Smart))
Код, я пытаюсь, но безуспешно:
var resultMap: Map[Any, Any] = Map()
groupedDataSet.foreach( r => {
val key1 = "relation".toString
val value1 = r(0).toString
val key2 = "obj_instance".toString
val value2 = r(1).toString
val key3 = "obj".toString
val value3 = r(2).toString
val key4 = r(2).toString
val value4 = r(3)
resultMap += (key1 -> value1, key2 -> value2, key3 -> value3, key4 -> value4)
})
resultMap.foreach(println)
Пожалуйста, помогите.
Ниже приведен код для создания тестового фрейма данных и столбца карты
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.functions._
object DFToMap extends App {
//Creating SparkSession
lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
.setIfMissing("spark.master", "local[*]")
lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()
import sparkSession.implicits._
// Creating raw DataFrame
val rawTestDF = Seq(("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "VSData", "XYZVN"), ("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "Name", "Smart"),
("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Model", "2000"), ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Version", "1.0"))
.toDF("relation", "obj_instance", "obj", "key", "value")
rawTestDF.show(false)
val joinTheMap = udf { json_value: Seq[Map[String, String]] => json_value.flatten.toMap }
val groupedDataSet = rawTestDF.groupBy("relation", "obj_instance", "obj").agg(collect_list(map(col("key"), col("value"))) as "map_value_temp").withColumn("map_value", joinTheMap(col("map_value_temp")))
.drop("map_value_temp")
groupedDataSet.show(false) //This is the Input DataFrame.
}
Окончательный вывод Json с карты:
[{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}
{"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj:"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}]
Примечание. Я не хочу использовать Spark groupBy, pivot, agg, поскольку потоковая передача Spark не поддерживает мультиагрегацию. Поэтому я хочу получить его с чистым Scala кодом. Пожалуйста, помогите.