Преобразовать столбец данных со строкой json в разные столбцы - PullRequest
0 голосов
/ 21 октября 2019

Я получаю данные из местоположения BLOB-объекта в кадре данных, как показано ниже.

| NUM_ID|                                                                                                                              Event|
+-------+-----------------------------------------------------------------------------------------------------------------------------------+
|XXXXX01|[{"SN":"SIG1","E":1571599398000,"V":19.79},{"SN":"SIG1","E":1571599406000,"V":19.80},{"SN":"SIG2","E":1571599406000,"V":25.30},{...|
|XXXXX02|[{"SN":"SIG1","E":1571599414000,"V":19.79},{"SN":"SIG2","E":1571599414000,"V":19.80},{"SN":"SIG2","E":1571599424000,"V":25.30},{...|

Если мы возьмем одну строку, это будет так, как показано ниже.

|XXXXX01|[{"SN":"SIG1","E":1571599398000,"V":19.79},{"SN":"SIG1","E":1571599406000,"V":19.80},{"SN":"SIG1","E":1571599414000,"V":19.20},{"SN":"SIG2","E":1571599424000,"V":25.30},{"SN":"SIG2","E":1571599432000,"V":19.10},{"SN":"SIG3","E":1571599440000,"V":19.10},{"SN":"SIG3","E":1571599448000,"V":19.10},{"SN":"SIG3","E":1571599456000,"V":19.10},{"SN":"SIG3","E":1571599396000,"V":19.79},{"SN":"SIG3","E":1571599404000,"V":19.79}]

Столбец события имеет разные сигналы, такие как пара E, V.

Схема для этого кадра данных показана ниже.

scala> df.printSchema
root
 |-- NUM_ID: string (nullable = true)
 |-- Event: string (nullable = true)

Я хочу принять несколько сигналов (предположим, мне нужнотолько SIG1 и SIG3) вместе с парами E, V в качестве нового столбца, как показано ниже.

+-------+--------+--------------+------+
| NUM_ID|   Event|             E|     V|
+-------+--------+--------------+------+
|XXXXX01|    SIG1| 1571599398000| 19.79|
|XXXXX01|    SIG1| 1571599406000| 19.80|
|XXXXX01|    SIG1| 1571599414000| 19.20|
|XXXXX01|    SIG3| 1571599440000| 19.10|
|XXXXX01|    SIG3| 1571599448000| 19.10|
|XXXXX01|    SIG3| 1571599406000| 19.10|
|XXXXX01|    SIG3| 1571599396000| 19.70|
|XXXXX01|    SIG3| 1571599404000| 19.70|
+-------+--------+--------------+------+

и окончательный результат должен быть таким, как показано ниже для каждого NUM_ID .

+-------+--------------+------+------+
| NUM_ID|             E|SIG1 V|SIG3 V|    
+-------+--------------+------+------+
|XXXXX01| 1571599398000| 19.79|  null|
|XXXXX01| 1571599406000| 19.80| 19.70|
|XXXXX01| 1571599414000| 19.20|  null|
|XXXXX01| 1571599440000|  null| 19.10|
|XXXXX01| 1571599448000|  null| 19.10|
|XXXXX01| 1571599448000|  null| 19.10|
|XXXXX01| 1571599406000| 19.80| 19.10|
|XXXXX01| 1571599396000|  null| 19.70|
|XXXXX01| 1571599404000|  null| 19.70|
+-------+--------------+------+------+

Цените любые выводы. Заранее спасибо!

Ответы [ 2 ]

1 голос
/ 21 октября 2019

Столбец «Выше» содержит несколько записей подряд, то есть данные должны быть сглажены перед дальнейшей обработкой. Сглаживание данных может быть достигнуто с помощью операции преобразования плоских карт в DataFrame.

Подход заключается в создании сглаженного JSON Dataframe со всеми необходимыми ключами и значениями и, наконец, преобразовании JSON в DataFrame с помощью Spark read json API.

val mapper = new ObjectMapper()
import spark.implicits._

val flatDF = df.flatMap(row => {
  val numId = row.getAs[String]("NUM_ID")
  val event = row.getAs[String]("Event")
  val data = mapper.readValue(event, classOf[Array[java.util.Map[String, String]]])

  data.map(jsonMap => {
    jsonMap.put("NUM_ID", numId)
    mapper.writeValueAsString(jsonMap)
  })
})

val finalDF = spark.read.json(flatDF)

//finalDF Outout
+-------------+-------+----+-----+
|            E| NUM_ID|  SN|    V|
+-------------+-------+----+-----+
|1571599398000|XXXXX01|SIG1|19.79|
|1571599406000|XXXXX01|SIG1| 19.8|
|1571599406000|XXXXX01|SIG2| 25.3|
|1571599414000|XXXXX02|SIG1|19.79|
|1571599414000|XXXXX02|SIG2| 19.8|
|1571599424000|XXXXX02|SIG2| 25.3|
+-------------+-------+----+-----+
0 голосов
/ 25 октября 2019

Это получается путем получения объекта json и взрыва столбца, как показано ниже.

val schema = ArrayType(StructType(Seq(StructField("SN", StringType), StructField("E", StringType), StructField("V", StringType))))
val structDF = fromBlobDF.withColumn("sig_array", from_json($"Event", schema))

val signalsDF = structDF.withColumn("sig_array", explode($"sig_array")).withColumn("SIGNAL", $"sig_array.SN").withColumn("E", $"sig_array.E").withColumn("V", $"sig_array.V").select("NUM_ID","E","SIGNAL","V")

...