Этого можно добиться с помощью функции Window :
import pyspark.sql.functions as F
from pyspark.sql import Window
l = [('00234' , '11-03-2014 05:55', 5.6 , 2.3 , 3.3),
('00235' , '11-03-2014 05:33' , 2.8, 0.9 , 4.2),
('00236' , '11-03-2014 06:15' , 3.5 , 0.1 , 1.3),
('00234' , '11-03-2014 07:23' , 2.5 , 0.2 , 3.9),
('00236' , '11-03-2014 07:33', 2.5 , 4.5, 2.9)]
columns = ['DeviceID', 'TimeStamp', 'A','B','C']
df=spark.createDataFrame(l, columns)
w = Window.partitionBy('DeviceID')
df = df.select('DeviceID', 'TimeStamp', F.greatest('A','B','C').alias('max_value'))
df.withColumn('bla', F.max('max_value').over(w)).where(F.col('max_value') == F.col('bla')).drop('bla').show()
Вывод:
+--------+----------------+---------+
|DeviceID| TimeStamp |max_value|
+--------+----------------+---------+
| 00236|11-03-2014 07:33| 4.5|
| 00234|11-03-2014 05:55| 5.6|
| 00235|11-03-2014 05:33| 4.2|
+--------+----------------+---------+