Преобразовать столбец в наборе данных, у которого есть пары ключ-значение, в разные строки - PullRequest
0 голосов
/ 30 сентября 2019

У меня есть данные в фрейме данных, которые были получены из Azure EventHub. Затем я преобразовываю эти данные в объект json и сохраняю необходимые данные в наборе данных, как показано ниже.

Код для получения данных из Eventhub и сохранения их в фрейме данных.

val connectionString = ConnectionStringBuilder(<ENDPOINT URL>)
    .setEventHubName(<EVENTHUB NAME>).build

val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
    .setConsumerGroup("<CONSUMER GRP>")
    .setStartingPosition(EventPosition
             .fromEnqueuedTime(currTime.minus(Duration.ofMinutes(30))))
    .setEndingPosition(EventPosition.fromEnqueuedTime(currTime))

val reader =  spark.read.format("eventhubs").options(ehConf.toMap).load()

var SIGNALS =  reader
    .select(get_json_object(($"body").cast("string"),"$.NUM").alias("NUM"),
            get_json_object(($"body").cast("string"),"$.SIG1").alias("SIG1"),
            get_json_object(($"body").cast("string"),"$.SIG2").alias("SIG2"),
            get_json_object(($"body").cast("string"),"$.SIG3").alias("SIG3"),
            get_json_object(($"body").cast("string"),"$.SIG4").alias("SIG4")
     )

val SIGNALSFiltered = SIGNALS.filter(col("SIG1").isNotNull &&
    col("SIG2").isNotNull && col("SIG3").isNotNull && col("SIG4").isNotNull)

Данныеполученный в SIGNALSFiltered показан ниже.

+-----------------+--------------------+--------------------+--------------------+--------------------+
|              NUM|                SIG1|                SIG2|                SIG3|                SIG4|
+-----------------+--------------------+--------------------+--------------------+--------------------+
|XXXXX01|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX02|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|
|XXXXX03|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX04|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX05|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX06|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|
|XXXXX07|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX08|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|

Если мы проверим все данные для одной строки, это будет как показано ниже.

|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825},{"TIME":1569560475000,"VALUE":3.7812},{"TIME":1569560483000,"VALUE":1.7812},{"TIME":1569560491000,"VALUE":7.7875}]|
    [{"TIME":1569560537000,"VALUE":3.7825},{"TIME":1569560481000,"VALUE":9.7825},{"TIME":1569560489000,"VALUE":5.7825},{"TIME":1569560497000,"VALUE":34.7825}]|
    [{"TIME":1569560505000,"VALUE":34.7825},{"TIME":1569560513000,"VALUE":9.7825},{"TIME":1569560521000,"VALUE":34.7825},{"TIME":1569560527000,"VALUE":4.7825}]|
    [{"TIME":1569560535000,"VALUE":7.7825},{"TIME":1569560479000,"VALUE":35.7825},{"TIME":1569560487000,"VALUE":3.7825}]

Я хочу преобразоватькаждая пара время-значение в каждом столбце сигналов в новую строку.

Есть ли способ преобразовать базовый набор данных, как показано ниже ?. Каждый элемент в столбце должен быть преобразован в новую строку.

+-----------------+-----------------------------+---------------------------------------+-----------------------------+
|    NUM|    SIG1 TIME| SIG1 VALUE|    SIG2 TIME|   SIG2 VALUE|    SIG3 TIME|   SIG3 VALUE|    SIG4 TIME|  SIG4 VALUE |
+-----------------+-----------------------------+---------------------------------------+-----------------------------+
|XXXXX01|1569560531000|     3.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       2.7825|
|XXXXX01|1569560531000|     1.7825|1569560531000|       1.7825|        null |       null  |1569560531000|       2.7825|
|XXXXX01|1569560531000|     3.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       7.7825|
|XXXXX02|1569560531000|     7.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       2.7825|
|XXXXX02|null         |     null  |1569560531000|       5.7825|1569560531000|       7.7825|1569560531000|       5.7825|
|XXXXX02|1569560531000|     3.7825|1569560531000|       4.7825|1569560531000|       8.7825|1569560531000|       2.7825|
|XXXXX02|1569560531000|     5.7825|1569560531000|       7.7825|1569560531000|       9.7825|1569560531000|       2.7825|

Любые предложения или помощь приветствуется! Заранее спасибо.

Ответы [ 2 ]

1 голос
/ 30 сентября 2019
scala> SIGNALSFiltered.show(false)
+-------+--------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+
|NUM    |SIG1                                                                                                          |SIG2                                                                                                            |SIG3                                                                                                             |SIG4                                                                                 |
+-------+--------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+
|XXXXX01|[{"TIME":11,"VALUE":3.7825},{"TIME":12,"VALUE":3.7812},{"TIME":13,"VALUE":3.7812},{"TIME":14,"VALUE":34.7875}]|[{"TIME":21,"VALUE":3.7825},{"TIME":22,"VALUE":34.7825},{"TIME":23,"VALUE":34.7825},{"TIME":24,"VALUE":34.7825}]|[{"TIME":31,"VALUE":34.7825},{"TIME":32,"VALUE":34.7825},{"TIME":33,"VALUE":34.7825},{"TIME":34,"VALUE":34.7825}]|[{"TIME":41,"VALUE":34.7825},{"TIME":42,"VALUE":34.7825},{"TIME":43,"VALUE":34.7825}]|
+-------+--------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+


scala>  import scala.collection.mutable.ListBuffer
scala>  import org.apache.spark.sql.functions.arrays_zip
scala>  import scala.util.parsing.json._

scala>    def flatTime:UserDefinedFunction = udf((json:String) => {
     |    val pars = JSON.parseFull(json)
     |    var outputList = new ListBuffer[String]()
     |    pars.foreach{ x => 
     |    val y = x.asInstanceOf[List[Any]]
     |    y.foreach{ zz =>
     |    val z =  zz.asInstanceOf[Map[String,Double]]
     |     val tempStr = """[{"TIME" : """ + z("TIME").toString + """ ,"VALUE": """ +  z("VALUE").toString + """}]"""
     |     outputList += tempStr
     |   }
     |   }
     |   outputList.toList
     |   })

scala> SIGNALSFiltered.withColumn("var", explode(arrays_zip(flatTime(col("SIG1")),flatTime(col("SIG2")),flatTime(col("SIG3")),flatTime(col("SIG4"))))).select(col("NUM"), col("var.0").alias("SIG1"),col("var.1").alias("SIG2"),col("var.2").alias("SIG3"),col("var.3").alias("SIG4")).show(false)
+-------+-----------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+
|NUM    |SIG1                               |SIG2                               |SIG3                               |SIG4                               |
+-------+-----------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+
|XXXXX01|[{"TIME" : 11.0 ,"VALUE": 3.7825}] |[{"TIME" : 21.0 ,"VALUE": 3.7825}] |[{"TIME" : 31.0 ,"VALUE": 34.7825}]|[{"TIME" : 41.0 ,"VALUE": 34.7825}]|
|XXXXX01|[{"TIME" : 12.0 ,"VALUE": 3.7812}] |[{"TIME" : 22.0 ,"VALUE": 34.7825}]|[{"TIME" : 32.0 ,"VALUE": 34.7825}]|[{"TIME" : 42.0 ,"VALUE": 34.7825}]|
|XXXXX01|[{"TIME" : 13.0 ,"VALUE": 3.7812}] |[{"TIME" : 23.0 ,"VALUE": 34.7825}]|[{"TIME" : 33.0 ,"VALUE": 34.7825}]|[{"TIME" : 43.0 ,"VALUE": 34.7825}]|
|XXXXX01|[{"TIME" : 14.0 ,"VALUE": 34.7875}]|[{"TIME" : 24.0 ,"VALUE": 34.7825}]|[{"TIME" : 34.0 ,"VALUE": 34.7825}]|null                               |
+-------+-----------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+
1 голос
/ 30 сентября 2019

Вы можете сделать это, используя функцию explode. Он создаст новую строку для каждого элемента в вашем массиве, а затем вы сможете получить доступ к полям time и value, используя синтаксис точки (доступ к полям структуры). Вот простой пример для первого столбца:

data
.withColumn("sig1_obj", explode($"SIG1"))
.withColumn("sig1_time", $"sig1_obj.time")
.withColumn("sig1_value", $"sig1_obj.value")
.show()

+--------------------+--------------------+-------------+----------+
|                SIG1|            sig1_obj|    sig1_time|sig1_value|
+--------------------+--------------------+-------------+----------+
|[[1569560531000, ...|[1569560531000, 3...|1569560531000|    3.7825|
|[[1569560531000, ...|[1569560475000, 3...|1569560475000|    3.7812|
|[[1569560531000, ...|[1569560483000, 1...|1569560483000|    1.7812|
|[[1569560531000, ...|[1569560491000, 7...|1569560491000|    7.7875|
+--------------------+--------------------+-------------+----------+

Точно так же вы можете обрабатывать и другие столбцы.

Также обратите внимание, что при использовании этого метода он умножит данные, дляВо втором столбце вы получите n*m строк, где n - количество элементов в массиве sig1, m - количество элементов в массиве sig2 и так далее. Если вы не хотите этого, вы можете разбить каждый столбец в отдельном фрейме данных, а затем выполнить полное внешнее объединение этих фреймов данных в некоторых полях (возможно, row_number строк для каждого NUM и объединение в NUM col и row_number)

Редактировать:

Поскольку у вас есть столбец типа StringType, вы можете сначала преобразовать это поле строки в массив структур, используя функцию from_json. В вашем примере это можно сделать следующим образом:

import org.apache.spark.sql.types.{StructField, StructType, ArrayType, StringType}

val schema = ArrayType(StructType(Seq(StructField("TIME", StringType), StructField("VALUE", StringType))))

df.withColumn("sig1_arr", from_json($"SIG1", schema))

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...