спарк разбирать внутреннее поле значения json и создавать новые столбцы - PullRequest
0 голосов
/ 01 октября 2018

У меня есть данные json, в которых еще один json имеет поле в виде строки, поэтому я хочу проанализировать это и создать новые столбцы ниже: json

{
    "start": "1234567679",
    "data": "{\"ID\": 123 ,\"changeVlaue\" : 89, \"type\" : \"sensor\"}",
    "end": "1234567689"
}

{
    "start": "1234567889",
    "data": "{\"name\": \"xyz \" ,\"changeState\" : \"Done \",\"mode\" : \"new \"}",
    "end": "1234567989"
}

{
    "start": "1234568679",
    "data": "{\"field\": \"all\" ,\"ChangedBy\" : \"Admin\", \"count\" : 2}",
    "end": "1234568999"
}

из этого json Я хочу создать новые столбцы

 start             changeVlaue     changeState    ChangedBy    end
 1234567679            89            null          null       1234567689
 1234567889           null           Done          null       1234567989
 1234568679           null           null          Admin      1234568679

Одна логика, которую я могу придумать, - это использовать udfs

def getchangeVlaue(s1: String ) = {
    // parse and return changeVlaue
 } 

 def getchangeState(s1: String) = {
    // parse and return changeState
 } 

 def getChangedBy(s1: String) = {
     // parse and return ChangedBy
 }   

 val df = spark.read.json("path to json")

 val tdf = df.withColumn("changeVlaue",getchangeVlaue($"data")).withColumn("changeState",getchangeState($"data")).withColumn("ChangedBy",getchangeState($"data"))

, но с вышеупомянутым решением я не хочу это делать, потому что у меня есть 100 таких разных полей, поэтому мне нужно вызвать withColumn 100раз,

Есть ли лучший способ, как pivot для файлов JSON?

Ответы [ 2 ]

0 голосов
/ 01 октября 2018

Вы можете использовать foldLeft как, например, и другие варианты возможны с использованием UDF, генерируя столбцы:

val df2 = df1
   .columns
   .foldLeft(df1) { (DF, colName) =>
    DF
      .withColumnRenamed(
       colName,
       colName.toLowerCase().replace(" ", "_")
      )
 }

Примените свою собственную логику, это только пример.

0 голосов
/ 01 октября 2018

проверьте это.Я использую спарк 1.6.2

val conf = new SparkConf().setMaster("local[*]").setAppName("testing")
val sc = new SparkContext(conf)

val json =
  """[
    |  {
    |    "start": "1234567679",
    |    "data": "{\"ID\": 123 ,\"changeVlaue\" : 89, \"type\" : \"sensor\"}",
    |    "end": "1234567689"
    |  },
    |  {
    |    "start": "1234567889",
    |    "data": "{\"name\": \"xyz \" ,\"changeState\" : \"Done \",\"mode\" : \"new \"}",
    |    "end": "1234567989"
    |  },
    |  {
    |    "start": "1234568679",
    |    "data": "{\"field\": \"all\" ,\"ChangedBy\" : \"Admin\", \"count\" : 2}",
    |    "end": "1234568999"
    |  }
    |]""".stripMargin

val sqlContext = new SQLContext(sc)
val jsonrdd = sc.parallelize(Seq(json))

val inputDf = sqlContext.read.json(jsonrdd)

import sqlContext.implicits._
val df = inputDf.select("start", "data", "end")

import org.apache.spark.sql.functions.get_json_object

val dfWithData = Seq("ID", "changeVlaue", "type", "name", "changeState", "mode", "field", "ChangedBy", "count").map(
  c => get_json_object($"data", s"$$.$c").alias(c))

val dfData = df.select($"*" +: dfWithData: _*).drop("data")
dfData.show()

+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
|     start|       end|  ID|changeVlaue|  type|name|changeState|mode|field|ChangedBy|count|
+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
|1234567679|1234567689| 123|         89|sensor|null|       null|null| null|     null| null|
|1234567889|1234567989|null|       null|  null|xyz |      Done |new | null|     null| null|
|1234568679|1234568999|null|       null|  null|null|       null|null|  all|    Admin|    2|
+----------+----------+----+-----------+------+----+-----------+----+-----+---------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...