Как добавить столбец в фрейм данных pyspark, который содержит среднее значение одного на основе группировки по другому столбцу - PullRequest
0 голосов
/ 11 января 2019

Это похоже на некоторые другие вопросы, но это не так.

Допустим, у нас есть датафрейм pyspark, как показано ниже:

+-----+------+-----+        
|col1 | col2 | col3| 
+-----+------+-----+        
|A    |   5  |  6  |
+-----+------+-----+        
|A    |   5  |  8  |
+-----+------+-----+        
|A    |   6  |  3  |
+-----+------+-----+        
|A    |   5  |  9  |
+-----+------+-----+        
|B    |   9  |  6  |
+-----+------+-----+        
|B    |   3  |  8  |
+-----+------+-----+        
|B    |   9  |  8  |
+-----+------+-----+        
|C    |  3   |  4  |
+-----+------+-----+        
|C    |  5   |  1  |
+-----+------+-----+        

Я хочу добавить еще один столбец как new_col, который содержит среднее значение col2 на основе группировки по col1. Итак, ответ должен быть следующим

   +-----+------+------+--------+
   |col1 | col2 | col3 | new_col|
   +-----+------+------+--------+
   |  A  |   5  |  6   | 5.25   |
   +-----+------+------+--------+
   |  A  |   5  |  8   | 5.25   |
   +-----+------+------+--------+
   |  A  |   6  |  3   | 5.25   |
   +-----+------+------+--------+
   |  A  |   5  |  9   | 5.25   |
   +-----+------+------+--------+
   |  B  |   9  |  6   | 7      |
   +-----+------+------+--------+
   |  B  |   3  |  8   | 7      |
   +-----+------+------+--------+    
   |  B  |   9  |  8   | 7      |
   +-----+------+------+--------+
   |  C  |   3  |  4   | 4      |
   +-----+------+------+--------+
   |  C  |   5  |  1   | 4      |
   +-----+------+------+--------+

Любая помощь будет оценена.

Ответы [ 2 ]

0 голосов
/ 13 января 2019

Хорошо, после долгих попыток я смог ответить на вопрос сам. Я выкладываю здесь ответ для всех, у кого похожий вопрос. Исходный файл - это CSV-файл.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#reading the file
df = spark.read.csv('file's name.csv', header=True)
df.show()

выход

+-----+------+-----+        
|col1 | col2 | col3| 
+-----+------+-----+        
|A    |   5  |  6  |
+-----+------+-----+        
|A    |   5  |  8  |
+-----+------+-----+        
|A    |   6  |  3  |
+-----+------+-----+        
|A    |   5  |  9  |
+-----+------+-----+        
|B    |   9  |  6  |
+-----+------+-----+        
|B    |   3  |  8  |
+-----+------+-----+        
|B    |   9  |  8  |
+-----+------+-----+        
|C    |  3   |  4  |
+-----+------+-----+        
|C    |  5   |  1  |
+-----+------+-----+        


from pyspark.sql import functions as func
#Grouping the dataframe based on col1
col1group = df.groupBy('col1')
#Computing the average of col2 based on the grouping on col1
a= col1group.agg(func.avg("col2"))
a.show()

выход

+-----+----------+
|col1 | avg(col2)|
+-----+----------+
| A   |   5.25   |
+-----+----------+
| B   |   7.0    |
+-----+----------+
| C   |   4.0    |
+-----+----------+

Теперь мы объединяем последнюю таблицу с начальным кадром данных, чтобы сгенерировать желаемый кадр данных:

df=test1.join(a, on = 'lable', how = 'inner')
df.show()

выход

   +-----+------+------+---------+
   |col1 | col2 | col3 |avg(col2)|
   +-----+------+------+---------+
   |  A  |   5  |  6   | 5.25    |
   +-----+------+------+---------+
   |  A  |   5  |  8   | 5.25    |
   +-----+------+------+---------+
   |  A  |   6  |  3   | 5.25    |
   +-----+------+------+---------+
   |  A  |   5  |  9   | 5.25    |
   +-----+------+------+---------+
   |  B  |   9  |  6   | 7       |
   +-----+------+------+---------+
   |  B  |   3  |  8   | 7       |
   +-----+------+------+---------+    
   |  B  |   9  |  8   | 7       |
   +-----+------+------+---------+
   |  C  |   3  |  4   | 4       |
   +-----+------+------+---------+
   |  C  |   5  |  1   | 4       |
   +-----+------+------+---------+

Теперь измените имя последнего столбца на то, что мы хотим

df = df.withColumnRenamed('avg(val1)', 'new_col')
df.show()

выход

   +-----+------+------+--------+
   |col1 | col2 | col3 | new_col|
   +-----+------+------+--------+
   |  A  |   5  |  6   | 5.25   |
   +-----+------+------+--------+
   |  A  |   5  |  8   | 5.25   |
   +-----+------+------+--------+
   |  A  |   6  |  3   | 5.25   |
   +-----+------+------+--------+
   |  A  |   5  |  9   | 5.25   |
   +-----+------+------+--------+
   |  B  |   9  |  6   | 7      |
   +-----+------+------+--------+
   |  B  |   3  |  8   | 7      |
   +-----+------+------+--------+    
   |  B  |   9  |  8   | 7      |
   +-----+------+------+--------+
   |  C  |   3  |  4   | 4      |
   +-----+------+------+--------+
   |  C  |   5  |  1   | 4      |
   +-----+------+------+--------+
0 голосов
/ 11 января 2019

Шаг 1: Создание кадра данных.

from pyspark.sql.functions import avg, col
from pyspark.sql.window import Window
values = [('A',5,6),('A',5,8),('A',6,3),('A',5,9),('B',9,6),('B',3,8),('B',9,8),('C',3,4),('C',5,1)]
df = sqlContext.createDataFrame(values,['col1','col2','col3'])
df.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   A|   5|   6|
|   A|   5|   8|
|   A|   6|   3|
|   A|   5|   9|
|   B|   9|   6|
|   B|   3|   8|
|   B|   9|   8|
|   C|   3|   4|
|   C|   5|   1|
+----+----+----+

Шаг 2: Создание другого столбца с mean путем группировки по столбцу A.

w = Window().partitionBy('col1')
df = df.withColumn('new_col',avg(col('col2')).over(w))
df.show()
+----+----+----+-------+
|col1|col2|col3|new_col|
+----+----+----+-------+
|   B|   9|   6|    7.0|
|   B|   3|   8|    7.0|
|   B|   9|   8|    7.0|
|   C|   3|   4|    4.0|
|   C|   5|   1|    4.0|
|   A|   5|   6|   5.25|
|   A|   5|   8|   5.25|
|   A|   6|   3|   5.25|
|   A|   5|   9|   5.25|
+----+----+----+-------+
...