PySpark: поиск значения столбца на основе максимального значения трех других столбцов - PullRequest
3 голосов
/ 19 апреля 2019

Я новичок в Spark.Я использую фрейм данных df следующим образом

DeviceID       TimeStamp           A      B     C
00234       11-03-2014 05:55      5.6    2.3   3.3
00235       11-03-2014 05:33      2.8    0.9   4.2
00236       11-03-2014 06:15      3.5    0.1   1.3
00234       11-03-2014 07:23      2.5    0.2   3.9
00236       11-03-2014 07:33      2.5    4.5   2.9

. Как видно из приведенного выше примера df, для DeviceID 00234 максимальное значение среди A, B и C составляет 5,6.Аналогично для DeviceID 00236 максимальное значение среди A, B и C составляет 4,5.Я хочу получить значение TimeStamp на основе максимального значения для каждого DeviceID.Ясно, что для DeviceID 00234 это 11-03-2014 05:55.

Хотя я не пробовал какой-либо подход, будет ли работать следующий подход?

from pyspark.sql import function as F
max_value = df.groupby('DeviceID').agg(F.greatest('A','B','C').alias('max_value'))
df.withColumn('Max-TimeStamp',where(# please help me in putting the right codes))

Результирующий df должен выглядеть следующим образом

DeviceID    Max_Value     Max-TimeStamp
00234          5.6        11-03-2014 05:55
00236          4.5        11-03-2014 07:33

Любая помощь будет принята с благодарностью.Благодарю.

1 Ответ

1 голос
/ 20 апреля 2019

Этого можно добиться с помощью функции Window :

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [('00234'      , '11-03-2014 05:55',      5.6 ,   2.3 ,  3.3),
     ('00235'      , '11-03-2014 05:33'   ,   2.8,    0.9  , 4.2),
     ('00236'      , '11-03-2014 06:15'  ,    3.5 ,   0.1  , 1.3),
     ('00234'      , '11-03-2014 07:23' ,     2.5  ,  0.2 ,  3.9),
     ('00236'      , '11-03-2014 07:33',      2.5   , 4.5,   2.9)]

columns = ['DeviceID', 'TimeStamp', 'A','B','C']

df=spark.createDataFrame(l, columns)

w = Window.partitionBy('DeviceID')

df = df.select('DeviceID', 'TimeStamp', F.greatest('A','B','C').alias('max_value'))

df.withColumn('bla', F.max('max_value').over(w)).where(F.col('max_value') == F.col('bla')).drop('bla').show()

Вывод:

+--------+----------------+---------+ 
|DeviceID| TimeStamp      |max_value| 
+--------+----------------+---------+ 
|   00236|11-03-2014 07:33|      4.5| 
|   00234|11-03-2014 05:55|      5.6| 
|   00235|11-03-2014 05:33|      4.2| 
+--------+----------------+---------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...