df.groupBy("groupCol").agg(max("value")-min("value"))
Исходя из вопроса, отредактированного OP, вот способ сделать это в PySpark.Идея состоит в том, чтобы вычислить номера строк в порядке возрастания и убывания времени для каждой группы и использовать эти значения для вычитания.
from pyspark.sql import Window
from pyspark.sql import functions as func
w_asc = Window.partitionBy(df.groupCol).orderBy(df.time)
w_desc = Window.partitionBy(df.groupCol).orderBy(func.desc(df.time))
df = df.withColumn(func.row_number().over(w_asc).alias('rnum_asc')) \
.withColumn(func.row_number().over(w_desc).alias('rnum_desc'))
df.groupBy(df.groupCol) \
.agg((func.max(func.when(df.rnum_desc==1,df.value))-func.max(func.when(df.rnum_asc==1,df.value))).alias('diff')).show()
Было бы проще, если бы в Spark SQL была доступна оконная функция first_value
.Общий способ решить эту проблему с помощью SQL -
select distinct groupCol,diff
from (
select t.*
,first_value(val) over(partition by groupCol order by time) -
first_value(val) over(partition by groupCol order by time desc) as diff
from tbl t
) t