У меня есть этот набор данных:
ID timestamp value
unique1 1584420000 120
unique1 1584410000 100
unique1 1584400000 20
unique2 1584410000 90
unique2 1584400000 10
unique3 1584400000 30
Мне нужно рассчитать значение для идентификатора и версии в зависимости от предыдущей версии того же идентификатора. Если у идентификатора не была последняя версия, значение остается прежним
ID timestamp valueCalculated
unique1 1584420000 20
unique1 1584410000 80
unique1 1584400000 20
unique2 1584410000 80
unique2 1584400000 10
unique3 1584400000 30
Я пытался добиться этого, но я могу агрегировать только по идентификатору и версии и выполнить минус из последние две версии доступны (если идентификатор не был обновлен, он сохранит свое значение), который дает только одну строку для каждого идентификатора.
ID timestamp valueCalculated
unique1 1584420000 20
unique2 1584410000 80
unique3 1584400000 30
Это мой код:
dataset.groupBy("id","timestamp")
.agg(
max("timestamp").as("timestamp"),
functionscallUDF("CalculateValue",first("timestamp"),first("value"),last("timestamp"),last("value")
).as("valueCalculated")
i Я использовал UDF4 для вычисления ожидаемого значения:
sparksession.udf().register("CalculatValue", (UDF4<Long,Double,Long,Double,Double>) this::calculateValue , DataTypes.DoubleType);
public Double calculateValue(Long Version1, Double Value1,Long Version2, Double Value2){
if(version1.equals(version2)){
return value1;
}else{
return value1 - value2;
}
}
Я не думаю, что я использую хороший подход здесь из-за агрегации. Не могли бы вы помочь в достижении этого? Спасибо