У меня есть данные в фрейме данных, которые были получены из Azure EventHub.Затем я преобразовываю эти данные в объект json и сохраняю необходимые данные в наборе данных, как показано ниже.
Код для получения данных из Eventhub и сохранения их в фрейме данных.
val connectionString = ConnectionStringBuilder(<ENDPOINT URL>)
.setEventHubName(<EVENTHUB NAME>).build
val currTime = Instant.now
val ehConf = EventHubsConf(connectionString)
.setConsumerGroup("<CONSUMER GRP>")
.setStartingPosition(EventPosition
.fromEnqueuedTime(currTime.minus(Duration.ofMinutes(30))))
.setEndingPosition(EventPosition.fromEnqueuedTime(currTime))
val reader = spark.read.format("eventhubs").options(ehConf.toMap).load()
var SIGNALS = reader
.select(get_json_object(($"body").cast("string"),"$.NUM").alias("NUM"),
get_json_object(($"body").cast("string"),"$.SIG1").alias("SIG1"),
get_json_object(($"body").cast("string"),"$.SIG2").alias("SIG2"),
get_json_object(($"body").cast("string"),"$.SIG3").alias("SIG3"),
get_json_object(($"body").cast("string"),"$.SIG4").alias("SIG4")
)
val SIGNALSFiltered = SIGNALS.filter(col("SIG1").isNotNull &&
col("SIG2").isNotNull && col("SIG3").isNotNull && col("SIG4").isNotNull)
Данныеполученный в SIGNALSFiltered показан ниже.
+-----------------+--------------------+--------------------+--------------------+--------------------+
| NUM| SIG1| SIG2| SIG3| SIG4|
+-----------------+--------------------+--------------------+--------------------+--------------------+
|XXXXX01|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX02|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|[{"TIME":15695604780...|
|XXXXX03|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX04|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX05|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX06|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|[{"TIME":15695605340...|
|XXXXX07|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
|XXXXX08|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|[{"TIME":15695605310...|
Если мы проверим все данные для одной строки, это будет как показано ниже.
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825},{"TIME":1569560475000,"VALUE":3.7812},{"TIME":1569560483000,"VALUE":3.7812},{"TIME":1569560491000,"VALUE":34.7875}]|
[{"TIME":1569560537000,"VALUE":3.7825},{"TIME":1569560481000,"VALUE":34.7825},{"TIME":1569560489000,"VALUE":34.7825},{"TIME":1569560497000,"VALUE":34.7825}]|
[{"TIME":1569560505000,"VALUE":34.7825},{"TIME":1569560513000,"VALUE":34.7825},{"TIME":1569560521000,"VALUE":34.7825},{"TIME":1569560527000,"VALUE":34.7825}]|
[{"TIME":1569560535000,"VALUE":34.7825},{"TIME":1569560479000,"VALUE":34.7825},{"TIME":1569560487000,"VALUE":34.7825}]
Я хочу тольконаивысшая пара TIME из каждого столбца, а не все пары TIME VALUE.Вывод должен быть таким, как показано ниже.
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
| NUM| SIG1| SIG2| SIG3| SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":4.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":5.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":6.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":7.7825}]|
|XXXXX03|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":9.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":8.7825}]|
- Как выполнить итерацию по каждому столбцу в каждой строке и получить наибольшую пару TIME-VALUE?
После получения наибольшего значения вкаждый столбец (SIG1, .... SIG4) должен обновлять только значение ВРЕМЕНИ во всех столбцах с самым высоким из них.
Есть ли способ преобразовать базовый набор данных, как показано ниже?.Каждый элемент в столбце должен быть преобразован в новую строку.
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
| NUM| SIG1| SIG2| SIG3| SIG4|
+-----------------+-----------------------------+---------------------------------------+---------------------------------------+----------------------------------------+
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]| null |[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX01|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|
|XXXXX02|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|[{"TIME":1569560531000,"VALUE":3.7825}]|```
Any leads or help is appreciated! Thanks in Advance.