Рассчитать значение в наборе данных spark в зависимости от предыдущей версии с помощью Java Spark - PullRequest
0 голосов
/ 17 марта 2020

У меня есть этот набор данных:

ID      timestamp   value
unique1 1584420000  120
unique1 1584410000  100
unique1 1584400000  20
unique2 1584410000  90
unique2 1584400000  10
unique3 1584400000  30

Мне нужно рассчитать значение для идентификатора и версии в зависимости от предыдущей версии того же идентификатора. Если у идентификатора не была последняя версия, значение остается прежним

ID      timestamp   valueCalculated
unique1 1584420000  20
unique1 1584410000  80
unique1 1584400000  20
unique2 1584410000  80
unique2 1584400000  10
unique3 1584400000  30

Я пытался добиться этого, но я могу агрегировать только по идентификатору и версии и выполнить минус из последние две версии доступны (если идентификатор не был обновлен, он сохранит свое значение), который дает только одну строку для каждого идентификатора.

ID      timestamp   valueCalculated
unique1 1584420000  20
unique2 1584410000  80
unique3 1584400000  30

Это мой код:

dataset.groupBy("id","timestamp")
.agg(
max("timestamp").as("timestamp"),
functionscallUDF("CalculateValue",first("timestamp"),first("value"),last("timestamp"),last("value")
).as("valueCalculated")

i Я использовал UDF4 для вычисления ожидаемого значения:

sparksession.udf().register("CalculatValue", (UDF4<Long,Double,Long,Double,Double>) this::calculateValue , DataTypes.DoubleType);

public Double calculateValue(Long Version1, Double Value1,Long Version2, Double Value2){
if(version1.equals(version2)){
return value1;
}else{
return value1 - value2;
}
}

Я не думаю, что я использую хороший подход здесь из-за агрегации. Не могли бы вы помочь в достижении этого? Спасибо

1 Ответ

0 голосов
/ 17 марта 2020

Я не понимаю, что это за версия, но вы можете вычислить разницу между значением между текущей строкой и предыдущей строкой следующим образом:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("ID").orderBy("timestamp")
df.withColumn("previousValue", lag($"value", 1, 0).over(w))
  .withColumn("valueCalculated", $"value" - $"previousValue")
  .orderBy("ID", "timestamp")
  .show(false)

, который даст вам результат как следует:

+-------+----------+-----+-------------+---------------+
|ID     |timestamp |value|previousValue|valueCalculated|
+-------+----------+-----+-------------+---------------+
|unique1|1584400000|20   |0            |20             |
|unique1|1584410000|100  |20           |80             |
|unique1|1584420000|120  |100          |20             |
|unique2|1584400000|10   |0            |10             |
|unique2|1584410000|90   |10           |80             |
|unique3|1584400000|30   |0            |30             |
+-------+----------+-----+-------------+---------------+
...