разделить строки фрейма данных искры ключом json для создания нового вывода фрейма данных - PullRequest
0 голосов
/ 05 августа 2020

Использование фрейма данных Spark.

scala> val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
df_input: org.apache.spark.sql.DataFrame = [p_id: string, p_meta: string]

scala> df_input.show()
+----+----------------+
|p_id|          p_meta|
+----+----------------+
|  p1|{"a": 1, "b": 2}|
|  p2|        {"c": 3}|
+----+----------------+

Учитывая этот входной df, возможно ли разделить его на json ключ для создания нового df_output, как показано ниже?

df_output =

p_id    p_meta_key      p_meta_value
 p1         a                1
 p1         b                2
 p2         c                3

Я использую Spark версии 3.0.0 / scala 2.12.x. и я предпочитаю использовать spark.sql.functions._

Ответы [ 2 ]

3 голосов
/ 05 августа 2020

Другой вариант -

from_json + explode

 val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") )
      .toDF("p_id", "p_meta")
    df_input.show(false)
    /**
      * +----+----------------+
      * |p_id|p_meta          |
      * +----+----------------+
      * |p1  |{"a": 1, "b": 2}|
      * |p2  |{"c": 3}        |
      * +----+----------------+
      */

    df_input.withColumn("p_meta", from_json($"p_meta", "map<string, string>", Map.empty[String, String]))
      .selectExpr("p_id", "explode(p_meta) as (p_meta_key, p_meta_value)")
      .show(false)
    /**
      * +----+----------+------------+
      * |p_id|p_meta_key|p_meta_value|
      * +----+----------+------------+
      * |p1  |a         |1           |
      * |p1  |b         |2           |
      * |p2  |c         |3           |
      * +----+----------+------------+
      */
1 голос
/ 05 августа 2020

приведенный ниже код решит вашу проблему, я тестировал это в spark 3.0.0 / scala 2.12.10.


import org.apache.spark.sql.functions._

val df_input = Seq( ("p1", """{"a": 1, "b": 2}"""), ("p2", """{"c": 3}""") ).toDF("p_id", "p_meta")
df_input.show()

/*
+----+----------------+
|p_id|          p_meta|
+----+----------------+
|  p1|{"a": 1, "b": 2}|
|  p2|        {"c": 3}|
+----+----------------+
*/
//UDF to convert JSON to MAP
def convert(str:String):Map[String,String]={
     "(\\w+): (\\w+)".r.findAllIn(str).matchData.map(i => {
     (i.group(1), i.group(2))
     }).toMap
     }
val udfConvert=spark.udf.register("udfConvert",convert _)

//Remove double quotes 
val df=df_input.withColumn("p_meta", regexp_replace($"p_meta", "\"", ""))
df.show()

/*
+----+------------+
|p_id|      p_meta|
+----+------------+
|  p1|{a: 1, b: 2}|
|  p2|      {c: 3}|
+----+------------+
*/

val df1=df.withColumn("new_col",udfConvert($"p_meta"))

/*
+----+------------+----------------+
|p_id|      p_meta|         new_col|
+----+------------+----------------+
|  p1|{a: 1, b: 2}|[a -> 1, b -> 2]|
|  p2|      {c: 3}|        [c -> 3]|
+----+------------+----------------+
*/

df1.select($"p_id",$"p_meta",$"new_col",explode($"new_col")).drop($"p_meta").drop($"new_col").withColumn("p_meta_key",$"key").withColumn("p_mata_value",$"value").drop($"key").drop($"value").show()
/*
+----+----------+------------+
|p_id|p_meta_key|p_mata_value|
+----+----------+------------+
|  p1|         a|           1|
|  p1|         b|           2|
|  p2|         c|           3|
+----+----------+------------+
*/

...