Разбор scala Json в dataframe - PullRequest
0 голосов
/ 26 июня 2018

образец Json

 "alternateId": [
    {
        "type": "POPID",
        "value": "1-7842-0759-001"
    },
    {
        "type": "CAMID",
        "value": "CAMID 0000-0002-7EC1-02FF-O-0000-0000-2"
    },
    {
        "type": "ProgrammeUuid",
        "value": "1ddb01e2-6146-4e10-bba9-dde40d0ad886"
    }
]

Я хочу обновить существующий фрейм данных с двумя столбцами, эти два столбца - POPID и CAMID. Эти два значения должны быть проанализированы из структуры JSON Я не знаю, как разобрать эту структуру, Можете ли вы помочь мне в том, что мне нужно изменить в методе fetchField. Как указано выше, json POPID размещается первым, а CAMID - вторым, но в реальных jsons его можно разместить в одном из этих 3 мест внутри alternateId.

 val fetchCAMID_udf = udf(fetchCAMID _)
 val fetchPOPID_udf = udf(fetchPOPID _)

 var updatedDf = //Data frame initialize

 updatedDf = updatedDf.withColumn("CAMID", fetchCAMID_udf(col("alternate_id")))
 updatedDf = updatedDf.withColumn("POPID", fetchPOPID_udf(col("alternate_id")))
 updatedDf .show(10,false)


 def fetchCAMID(jsonStr: String): String = {
var CAMID: String = fetchField(jsonStr, "CAMID")
 CAMID
}

 def fetchPOPID(jsonStr: String): String = {
fetchField(jsonStr, "POPID")
}


 def fetchField(jsonStr: String, fieldName: String): String = {
 try {
   implicit val formats = DefaultFormats
   val extractedField = jsonStr match {
    case "(unknown)" => jsonStr
    case _ => {
      val json = JsonMethods.parse(jsonStr)
      val resultExtracted = (json \\ fieldName)
      val result = resultExtracted match {
        case _: JString => resultExtracted.extract[String]
        case _: JInt => resultExtracted.extract[Int].toString
        case _: JObject => "(unknown)"
      }
      result
    }
   }
  extractedField
 }
catch{
  case e: Exception =>{
    log.error(s"Fetch field failed. Field name: $fieldName . Json: $jsonStr")
    "(unknown)"
   }
  }
}

Ответы [ 2 ]

0 голосов
/ 26 июня 2018

вы можете прочитать JSON, используя Spark, и получить его, используя обычные операции искры

val df=spark.read.option("multiLine",true).json("test.json")

 df.select($"alternateId".getItem(0).as("pop"),$"alternateId".getItem(1).as("cam")).select($"pop.value".as("POPID"),$"cam.value".as("CAMID")).show()

+---------------+--------------------+
|          POPID|               CAMID|
+---------------+--------------------+
|1-7842-0759-001|CAMID 0000-0002-7...|
+---------------+--------------------+
0 голосов
/ 26 июня 2018

Измените вашу fetchField функцию следующим образом

def fetchField(jsonStr: String, fieldName: String): String = {
  try {
    val typeAndValue = (JsonMethods.parse("{"+jsonStr+"}") \ "alternateId" \ "type" \\ classOf[JString]).zip(JsonMethods.parse("{"+jsonStr+"}") \ "alternateId" \ "value" \\ classOf[JString])
    typeAndValue.filter(_._1 == fieldName).map(_._2).toList(0)
  }catch{
    case e: Exception =>{
      "(unknown)"
    }
  }
}

и вы получите CAMID и POPID заполненные

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...