У меня есть Dataset<Row>
, который содержит шесть столбцов, как показано ниже:
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188264901 | 0002019000000| 0 | 0 |Voltage | 5 |
|1554188264901 | 0002019000000| 0 | 0 |SetPoint | 7 |
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188276412 | 0002019000000| 0 | 0 |SetPoint | 10 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
Конечная цель:
Получение последней обновленной строки на основе MAX(time)
для комбинации thingId
, controller
, module
и variableName
.
, поэтому требуемый вывод должен иметь MAX(time)
во всех строках и last_updatedValue для остальных значений variableName.
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
и столбец variableName
имеет два значения ('Voltage'
и 'SetPoint'
) для этого конкретного thingId, контроллера и модуля, поэтому для значения Voltage
в столбце variableName
должно возвращаться последняя обновленная строка для значения Voltage
с MAX(time)
.
как показано ниже, Ожидаемый результат:
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
Что я пробовал:
Я пытался Scalar sub-query
получить это, но столбец внутри подзапроса должен был быть агрегирован, я пытался разными способами, но безуспешно.
Например, этот код ниже:
Dataset<Row> inputds = spark.read().format("avro").load("hdfs://path");
inputds.createOrReplaceTempView("abc");
Dataset<Row> update = spark.sql("select MAX(p.time) max_time, p.thingId, p.controller, p.module, p.variableName, (SELECT d.value from abc d where d.thingId=p.thingId and d.controller=p.controller and d.module=p.module and d.variableName=p.variableName group by thingId,controller,module,variableName,value) as values from abc p")
update.show();
который выдает ошибку:
Коррелированная скалярная переменная должна быть агрегирована для скалярного подзапроса
Как я могу решить это? Пожалуйста, предложите мне, если есть какие-либо обходные пути.
Спасибо!