Groupby и UDF / UDAF в PySpark при сохранении структуры DataFrame - PullRequest
0 голосов
/ 28 октября 2018

Я новичок в PySpark и борюсь с простыми манипуляциями с кадрами.У меня есть датафрейм, похожий на:

product    period     rating   product_Desc1   product_Desc2 ..... more columns 
a            1         60          foo              xx
a            2         70          foo              xx
a            3         59          foo              xx
b            1         50          bar              yy
b            2         55          bar              yy
c            1         90          foo bar          xy
c            2         100         foo bar          xy

Я хотел бы сгруппировать продукт, добавить столбцы для вычисления арифметических, геометрических и гармонических средних рейтингов , сохраняя при этом остальные столбцы в фрейме данных , которые одинаковы для каждого продукта.

Я пытался сделать это с помощью комбинации встроенных функций и UDF.Например:

a_means = df.groupBy("product").agg(mean("rating").alias("a_mean")
g_means = df.groupBy("product").agg(udf_gmean("rating").alias("g_mean")

где:

def g_mean(x):
  gm = reduce(mul,x)**(1/len(x))
  return gm

udf_gmean = udf(g_mean, FloatType())

Затем я бы соединил выходные данные a_means и g_means с исходным фреймом данных продукта и удалил дубликаты.Тем не менее, этот метод возвращает ошибку, для g_means, утверждая, что «рейтинг» не участвует в groupBy и не является определяемой пользователем функцией агрегирования ....

Я также пытался использовать модуль gmean SciPy, ноЯ получаю сообщение об ошибке, в котором говорится, что ufunc 'log' не подходит для типов ввода, несмотря на то, что, насколько я вижу, все столбцы рейтинга являются целочисленными.

На сайте есть похожие вопросы, но ничегочто я могу найти, что, кажется, решить эту проблему у меня есть.Я был бы очень признателен за помощь, так как она сводит меня с ума!

Заранее спасибо, и я смогу быстро предоставить любую дополнительную информацию сегодня, если я не предоставил достаточно.

Стоит отметитьчто для эффективности я не могу просто преобразовать в Pandas и преобразовать, как я бы сделал это с помощью фрейма данных Pandas ... и я использую Spark 2.2 и не могу обновить!

Ответы [ 2 ]

0 голосов
/ 02 ноября 2018

Несколько проще, чем выше:

from spark_sklearn.group_apply import gapply
from scipy.stats.mstats import gmean
import pandas as pd

def g_mean(_, vals):
gm = gmean(vals["rating"])
return pd.DataFrame(data=[gm])

geoSchema = StructType().add("geo_mean", FloatType())

gMeans = gapply(df.groupby("product"), g_mean, geoSchema)

Возвращает фрейм данных, который затем можно отсортировать и объединить с оригиналом, используя:

df_withGeo = df.join(gMeans, ["product"])

И повторите процедуру для любогоСтолбцы функции типа агрегации, добавляемые в исходный DataFrame ...

0 голосов
/ 31 октября 2018

Как на счет этого

from pyspark.sql.functions import avg
df1 = df.select("product","rating").rdd.map(lambda x: (x[0],(1.0,x[1]*1.0))).reduceByKey(lambda x,y: (x[0]+y[0], x[1]*y[1])).toDF(['product', 'g_mean'])
gdf = df1.select(df1['product'],pow(df1['g_mean._2'],1.0/df1['g_mean._1']).alias("rating_g_mean"))
display(gdf)

+-------+-----------------+
|product|    rating_g_mean|
+-------+-----------------+
|      a|62.81071936240795|
|      b|52.44044240850758|
|      c|94.86832980505137|
+-------+-----------------+


df1 = df.withColumn("h_mean", 1.0/df["rating"])
hdf = df1.groupBy("product").agg(avg(df1["rating"]).alias("rating_mean"), (1.0/avg(df1["h_mean"])).alias("rating_h_mean"))
sdf = hdf.join(gdf, ['product'])
display(sdf)

+-------+-----------+-----------------+-----------------+
|product|rating_mean|    rating_h_mean|    rating_g_mean|
+-------+-----------+-----------------+-----------------+
|      a|       63.0|62.62847514743051|62.81071936240795|
|      b|       52.5|52.38095238095239|52.44044240850758|
|      c|       95.0|94.73684210526315|94.86832980505137|
+-------+-----------+-----------------+-----------------+


fdf = df.join(sdf, ['product'])
display(fdf.sort("product"))


+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+
|product|period|rating|product_Desc1|product_Desc2|rating_mean|    rating_h_mean|    rating_g_mean|
+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+
|      a|     3|    59|          foo|           xx|       63.0|62.62847514743051|62.81071936240795|
|      a|     2|    70|          foo|           xx|       63.0|62.62847514743051|62.81071936240795|
|      a|     1|    60|          foo|           xx|       63.0|62.62847514743051|62.81071936240795|
|      b|     2|    55|          bar|           yy|       52.5|52.38095238095239|52.44044240850758|
|      b|     1|    50|          bar|           yy|       52.5|52.38095238095239|52.44044240850758|
|      c|     2|   100|      foo bar|           xy|       95.0|94.73684210526315|94.86832980505137|
|      c|     1|    90|      foo bar|           xy|       95.0|94.73684210526315|94.86832980505137|
+-------+------+------+-------------+-------------+-----------+-----------------+-----------------+
...