У меня есть набор данных spark sparkDSDS Dataset<Row>
, как показано ниже
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188276412 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
Генерируется ШАГ 1
Dataset<Row> inputDS = readInput.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");
Ожидаемый результат
+---------------+---------------+----------------+-------+--------------+--------+
| time | thingId | controller | module| variableName | value |
+---------------+---------------+----------------+-------+--------------+--------+
|1554188639406 | 0002019000000| 0 | 0 |Voltage | 9 |
|1554188639406 | 0002019000000| 0 | 0 |SetPoint | 6 |
+---------------+---------------+----------------+-------+--------------+--------+
Max(time)
столбец для этого thingId,controller,module and variableName
Конечная цель - получить последнее обновленное значение для каждого thingId, контроллера, модуля и variableName на основе столбца MAX (time
).
Код
inputDS.createOrReplaceTempView("intermediate");
Dataset<Row> outputDS = spark.sql("select B.time,A.thingId,A.controller,A.module,A.variableName,A.value from intermediate A
inner join (select thingId,controller,module,MAX(time)time from intermediate group by thingId,controller,module) B
on A.thingId=B.thingId and A.controller=B.controller and A.module=B.module");
SQL-запрос работает, как и ожидалось, но использование inner join
не выглядит эффективным
1) Есть ли другой эффективный способ получения ожидаемого результата без внутреннего соединения или эквивалентного условия.
2) было бы замечательно, если бы мы могли получить ожидаемый результат от ШАГ 1
Dataset<Row> intermediate = inputDS.groupby("thingId","controller","module","variableName").agg(max(struct("time","value")).as("time_value_struct")).select("thingId","controller","module","variableName","time_value_struct.*");