Scala Spark - Разделить столбец JSON на несколько столбцов - PullRequest
0 голосов
/ 06 января 2020

Scala noob, используя Spark 2.3.0 .
Я создаю DataFrame, используя udf, который создает столбец JSON String:

val result: DataFrame = df.withColumn("decrypted_json", instance.decryptJsonUdf(df("encrypted_data")))

он выдает следующее:

+----------------+---------------------------------------+
| encrypted_data | decrypted_json                        |
+----------------+---------------------------------------+
|eyJleHAiOjE1 ...| {"a":547.65 , "b":"Some Data"}        |
+----------------+---------------------------------------+

UDF - это внешний код, который я не могу изменить. Я хотел бы разбить столбец decrypted_ json на отдельные столбцы, чтобы выходной DataFrame был примерно таким:

+----------------+----------------------+
| encrypted_data | a      | b           |
+----------------+--------+-------------+
|eyJleHAiOjE1 ...| 547.65 | "Some Data" |
+----------------+--------+-------------+

Ответы [ 2 ]

1 голос
/ 06 января 2020

Приведенное ниже решение вдохновлено одним из решений, данных @Jacek Laskowski:

import org.apache.spark.sql.types._
val JsonSchema = new StructType()
  .add($"a".string)
  .add($"b".string)
val schema = new StructType()
  .add($"encrypted_data".string)
  .add($"decrypted_json".array(JsonSchema))

val schemaAsJson = schema.json

import org.apache.spark.sql.types.DataType
val dt = DataType.fromJson(schemaAsJson)

import org.apache.spark.sql.functions._

val rawJsons = Seq("""
  {
    "encrypted_data" : "eyJleHAiOjE1",
    "decrypted_json" : [
      {
        "a" : "547.65",
        "b" : "Some Data"
      }
    ]
  }
""").toDF("rawjson")

val people = rawJsons
  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")
  .select("json.*") // <-- flatten the struct field
  .withColumn("address", explode($"decrypted_json")) // <-- explode the array field
  .drop("decrypted_json")  // <-- no longer needed
  .select("encrypted_data", "address.*") // <-- flatten the struct field

output

Пожалуйста go - Ссылка для оригинального решения с объяснением.
Надеюсь, это поможет.

0 голосов
/ 06 января 2020

Используя from_jason, вы можете проанализировать JSON в типе Struct, а затем выбрать столбцы из этого фрейма данных. Вам нужно будет знать схему json. Вот как -

    val sparkSession = //create spark session
    import sparkSession.implicits._

    val jsonData = """{"a":547.65 , "b":"Some Data"}"""
    val schema = {StructType(
      List(
        StructField("a", DoubleType, nullable = false),
        StructField("b", StringType, nullable = false)
      ))}

    val df = sparkSession.createDataset(Seq(("dummy data",jsonData))).toDF("string_column","json_column")
    val dfWithParsedJson = df.withColumn("json_data",from_json($"json_column",schema))

    dfWithParsedJson.select($"string_column",$"json_column",$"json_data.a", $"json_data.b").show()

Результат

+-------------+------------------------------+------+---------+
|string_column|json_column                   |a     |b        |
+-------------+------------------------------+------+---------+
|dummy data   |{"a":547.65 , "b":"Some Data"}|547.65|Some Data|
+-------------+------------------------------+------+---------+
...