Scala -Создание карты из Spark DataFrame - PullRequest
0 голосов
/ 26 апреля 2020

У меня есть Spark DataFrame, и я хочу создать карту и сохранить значения как Map [String, Map [String, String]]. Я не получаю идеи сделать это, любая помощь будет оценена.

Ниже представлен формат ввода и вывода:

Вход:

    +-----------------+------------+---+--------------------------------+
    |relation         |obj_instance|obj|map_value                       |
    +-----------------+------------+---+--------------------------------+
    |Start~>HInfo~>Mnt|Mnt         |Mnt|[Model -> 2000, Version -> 1.0] |
    |Start~>HInfo~>Cbl|Cbl-3       |Cbl|[VSData -> XYZVN, Name -> Smart]|
    +-----------------+------------+---+--------------------------------+

Выход:

    Map(relation -> Start~>HInfo~>Mnt, obj_instance -> Mnt, obj -> Mnt, Mnt -> Map(Model -> 2000, Version -> 1.0))
    Map(relation -> Start~>HInfo~>Cbl, obj_instance -> Cbl-3, obj -> Cbl, Cbl -> Map(VSData -> XYZVN, Name -> Smart))  

Код, я пытаюсь, но безуспешно:

   var resultMap: Map[Any, Any] = Map()
   groupedDataSet.foreach( r => {
     val key1 = "relation".toString
     val value1 = r(0).toString
     val key2 = "obj_instance".toString
     val value2 = r(1).toString
     val key3 = "obj".toString
     val value3 = r(2).toString
     val key4 = r(2).toString
     val value4 = r(3)

     resultMap += (key1 -> value1, key2 -> value2, key3 -> value3, key4 -> value4)
   })
     resultMap.foreach(println)

Пожалуйста, помогите.

Ниже приведен код для создания тестового фрейма данных и столбца карты

            import org.apache.spark.SparkConf
            import org.apache.spark.sql.{Column, SparkSession}
            import org.apache.spark.sql.functions._

            object DFToMap extends App {

              //Creating SparkSession
              lazy val conf = new SparkConf().setAppName("df-to-map").set("spark.default.parallelism", "2")
                .setIfMissing("spark.master", "local[*]")
              lazy val sparkSession = SparkSession.builder().config(conf).getOrCreate()

              import sparkSession.implicits._

    // Creating raw DataFrame
          val rawTestDF = Seq(("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "VSData", "XYZVN"), ("Start~>HInfo~>Cbl", "Cbl-3", "Cbl", "Name", "Smart"),
            ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Model", "2000"), ("Start~>HInfo~>Mnt", "Mnt", "Mnt", "Version", "1.0"))
            .toDF("relation", "obj_instance", "obj", "key", "value")

          rawTestDF.show(false)

    val joinTheMap = udf { json_value: Seq[Map[String, String]] => json_value.flatten.toMap }

          val groupedDataSet = rawTestDF.groupBy("relation", "obj_instance", "obj").agg(collect_list(map(col("key"), col("value"))) as "map_value_temp").withColumn("map_value", joinTheMap(col("map_value_temp")))
            .drop("map_value_temp")

          groupedDataSet.show(false)  //This is the Input DataFrame.


            }

Окончательный вывод Json с карты:

    [{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}
    {"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj:"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}]

Примечание. Я не хочу использовать Spark groupBy, pivot, agg, поскольку потоковая передача Spark не поддерживает мультиагрегацию. Поэтому я хочу получить его с чистым Scala кодом. Пожалуйста, помогите.

1 Ответ

0 голосов
/ 26 апреля 2020

Создан пользовательский UDF для анализа и генерации данных в формате JSON.

  import org.json4s.native.JsonMethods._
  import org.json4s._
  import org.json4s.JsonDSL._

  def toJson(relation:String,obj_instance: String,obj: String,map_value: Map[String,String]) = {
    compact(render(
      JObject("relation" -> JString(relation),
        "obj_instance" -> JString(obj_instance),
        "obj" -> JString(obj),
        obj -> map_value)))
  }

  import org.apache.spark.sql.functions._
  val createJson = udf(toJson _)
  val df = Seq(("Start~>HInfo~>Mnt","Mnt","Mnt",Map("Model" -> "2000", "Version" -> "1.0")),("Start~>HInfo~>Cbl","Cbl-3","Cbl",Map("VSData" -> "XYZVN", "Name" -> "Smart"))).toDF("relation","obj_instance","obj","map_value")
  df.select(createJson($"relation",$"obj_instance",$"obj",$"map_value").as("json_map")).show(false)


+-----------------------------------------------------------------------------------------------------------+
|json_map                                                                                                   |
+-----------------------------------------------------------------------------------------------------------+
|{"relation":"Start~>HInfo~>Mnt","obj_instance":"Mnt","obj":"Mnt","Mnt":{"Model":"2000","Version":"1.0"}}   |
|{"relation":"Start~>HInfo~>Cbl","obj_instance":"Cbl-3","obj":"Cbl","Cbl":{"VSData":"XYZVN","Name":"Smart"}}|
+-----------------------------------------------------------------------------------------------------------+

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...