Предположим, у нас есть pyspark для фрейма данных с:
-
key
столбцом, по которому мы хотим агрегировать, filter
столбцом, который используется для фильтрациинекоторые агрегации (но не все!) - столбец
value
для выполнения вычислений.
Как показано ниже:
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.window import Window
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('key1','filter1',1),
('key1','filter1',2),
('key1','filter2',4),
('key2','filter1',8),
('key2','filter1',16),
('key2','filter1',32),
('key2','filter2',64)],
['key','filter','value'])
Теперь я хочу собрать и вычислить: среднее значение value
на key
и сумму value
на key
, когда filter=='filter1'
.
. Я могу сделать это с помощью groupby
:
df.groupby('key').agg(
F.avg(F.col('value')).alias('value_mean'),
F.sum(F.when(F.col('filter')== 'filter1' ,F.col('value'))).alias('filter1_value_sum')).show()
Что дает:
key value_mean filter1_value_sum
key1 2.333333333335 3
key2 30.0 56
Но я хотел бы сделать это, используя оконную функцию, чтобы разумно разбить мой агрегированный кадр данных для последующего сохранения.
Я пробовал следующее:
window = Window.partitionBy('key').orderBy('filter')
df.select(df['key'],df['filter'],
F.avg(df['value']).over(window).alias('value_mean'),
F.sum(F.when(F.col('filter') == 'filter1',F.col('value'))).over(window).alias('filter1_value_sum'),
F.row_number().over(window).alias('rank')).filter(
F.col('rank') == 1).drop('rank').show()
Что приводит к следующему:
key value_mean filter1_value_sum
key1 1.5 3
key2 18.666666666668 56
Как видите, значение, вычисленное в value_mean
, учитывает только строки, удовлетворяющиеfilter == 'filter1
.
Можно ли получить результат примера groupby
с помощью оконной функции?