Выполните итерацию по столбцу в наборе данных, в котором есть массив пар ключ-значение, и найдите пару с максимальным значением - PullRequest
2 голосов
/ 27 сентября 2019

У меня есть данные в фрейме данных, которые были получены из Azure EventHub.Затем я преобразовываю эти данные в объект json и сохраняю необходимые данные в наборе данных, как показано ниже.

Код для получения данных из Eventhub и сохранения их в фрейме данных.

val connectionString = ConnectionStringBuilder(<ENDPOINT URL>)
    .setEventHubName(<EVENTHUB NAME>).build

val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
    .setConsumerGroup("<CONSUMER GRP>")
    .setStartingPosition(EventPosition
             .fromEnqueuedTime(currTime.minus(Duration.ofMinutes(30))))
    .setEndingPosition(EventPosition.fromEnqueuedTime(currTime))

val reader =  spark.read.format("eventhubs").options(ehConf.toMap).load()

var SIGNALS =  reader
    .select(get_json_object(($"body").cast("string"),"$.NUM").alias("NUM"),
            get_json_object(($"body").cast("string"),"$.SIG1").alias("SIG1"),
            get_json_object(($"body").cast("string"),"$.SIG2").alias("SIG2"),
            get_json_object(($"body").cast("string"),"$.SIG3").alias("SIG3"),
            get_json_object(($"body").cast("string"),"$.SIG4").alias("SIG4")
     )

val SIGNALSFiltered = SIGNALS.filter(col("SIG1").isNotNull &&
    col("SIG2").isNotNull && col("SIG3").isNotNull && col("SIG4").isNotNull)

Данныеполученный в SIGNALSFiltered показан ниже.

+-----------------+--------------------+--------------------+--------------------+--------------------+
|              NUM|                SIG1|                SIG2|                SIG3|                SIG4|
+-----------------+--------------------+--------------------+--------------------+--------------------+
|XXXXX01|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX02|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|
|XXXXX03|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX04|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX05|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX06|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|
|XXXXX07|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX08|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|

Если мы проверим все данные для одной строки, это будет как показано ниже.

|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825},{"TIME":1569560475000,"VALUE":3.7812},{"TIME":1569560483000,"VALUE":3.7812},{"TIME":1569560491000,"VALUE":34.7875}]|
    [{"TIME":1569560537000,"VALUE":3.7825},{"TIME":1569560481000,"VALUE":34.7825},{"TIME":1569560489000,"VALUE":34.7825},{"TIME":1569560497000,"VALUE":34.7825}]|
    [{"TIME":1569560505000,"VALUE":34.7825},{"TIME":1569560513000,"VALUE":34.7825},{"TIME":1569560521000,"VALUE":34.7825},{"TIME":1569560527000,"VALUE":34.7825}]|
    [{"TIME":1569560535000,"VALUE":34.7825},{"TIME":1569560479000,"VALUE":34.7825},{"TIME":1569560487000,"VALUE":34.7825}]

Я хочу тольконаивысшая пара TIME из каждого столбца, а не все пары TIME VALUE.Вывод должен быть таким, как показано ниже.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|              NUM|                         SIG1|                                   SIG2|                                   SIG3|                                    SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":4.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":5.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":6.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":7.7825}]|
|XXXXX03|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":9.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":8.7825}]|

  1. Как выполнить итерацию по каждому столбцу в каждой строке и получить наибольшую пару TIME-VALUE?
  2. После получения наибольшего значения вкаждый столбец (SIG1, .... SIG4) должен обновлять только значение ВРЕМЕНИ во всех столбцах с самым высоким из них.

  3. Есть ли способ преобразовать базовый набор данных, как показано ниже?.Каждый элемент в столбце должен быть преобразован в новую строку.

+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|    NUM|                                   SIG1|                                   SIG2|                                   SIG3|                                    SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|        null                           |[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|```

Any leads or help is appreciated! Thanks in Advance.

1 Ответ

1 голос
/ 27 сентября 2019

Вы должны написать одну пользовательскую функцию, как показано ниже.который зациклит ваши данные и получит максимальное значение времени. Примечание: UDF только для справки, вы можете изменить его согласно требованию

  1. Как перебирать каждый столбец в каждой строке и получать самую высокую пару TIME-VALUE?
scala> import org.apache.spark.sql.expressions.{UserDefinedFunction} 

scala> def MaxTime:UserDefinedFunction = udf((json:String) => {
   val pars = JSON.parseFull(json)
   var output=""
   pars.foreach{ x => val y = x.asInstanceOf[List[Any]]
     var i = 1
     var TimeMap = scala.collection.mutable.Map[String, Long]()
     var ValueMap = scala.collection.mutable.Map[String, Double]()
     y.foreach{ zz => val z =  zz.asInstanceOf[Map[String,Double]]
       TimeMap(i.toString) =  z("TIME").toLong
       ValueMap(i.toString) =  z("VALUE")
       i = i + 1
     }
   output = """[{"TIME" : """ + TimeMap.maxBy(_._2)._2.toString + """ ,"VALUE": """ + ValueMap(TimeMap.maxBy(_._2)._1) + """}]"""
  } 
output})

scala> SIGNALSFiltered.withColumn("SIG1", MaxTime(col("SIG1")).withColumn("SIG2", MaxTime(col("SIG2")))).withColumn("SIG3", MaxTime(col("SIG3"))).withColumn("SIG4", MaxTime(col("SIG4"))).show(false)
После получения наивысшего значения в каждом столбце (SIG1, .... SIG4) необходимо обновить только значение ВРЕМЕНИ во всех столбцах с наивысшим среди них.

Записать такой же UDF каквыше и передайте полную строку в качестве параметра.Затем проанализируйте значение каждого столбца в Map и получите максимум среди всех столбцов.

...