Добавление столбца в фрейм данных PySpark включает стандартные отклонения столбца на основе группировки по двум другим столбцам - PullRequest
0 голосов
/ 19 января 2019

Предположим, что у нас есть CSV-файл, который был импортирован в виде фрейма данных в PysPark следующим образом

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("file path and name.csv", inferSchema = True, header = True)
df.show()

output

+-----+----+----+
|lable|year|val |
+-----+----+----+
|    A|2003| 5.0|
|    A|2003| 6.0|
|    A|2003| 3.0|
|    A|2004|null|
|    B|2000| 2.0|
|    B|2000|null|
|    B|2009| 1.0|
|    B|2000| 6.0|
|    B|2009| 6.0|
+-----+----+----+

Теперь мы хотим добавить еще один столбец к df, который содержит стандартное отклонение val на основе группировки по двум столбцам lable и year. Итак, вывод должен быть следующим:

+-----+----+----+-----+
|lable|year|val | std |
+-----+----+----+-----+
|    A|2003| 5.0| 1.53|
|    A|2003| 6.0| 1.53|
|    A|2003| 3.0| 1.53|
|    A|2004|null| null|
|    B|2000| 2.0| 2.83|
|    B|2000|null| 2.83|
|    B|2009| 1.0| 3.54|
|    B|2000| 6.0| 2.83|
|    B|2009| 6.0| 3.54|
+-----+----+----+-----+

У меня есть следующие коды, которые работают для небольшого фрейма данных, но не работают для очень большого фрейма данных (около 40 миллионов строк), с которым я сейчас работаю.

import pyspark.sql.functions as f    
a = df.groupby('lable','year').agg(f.round(f.stddev("val"),2).alias('std'))
df = df.join(a, on = ['lable', 'year'], how = 'inner')

Я получаю Py4JJavaError Traceback (most recent call last) ошибку после запуска на моем большом фрейме данных.

Кто-нибудь знает альтернативный способ? Я надеюсь, что ваш путь работает с моим набором данных.

Я использую python3.7.1, pyspark2.4 и jupyter4.4.0

1 Ответ

0 голосов
/ 19 января 2019

Объединение на фрейме данных вызывает много перетасовки данных между исполнителями. В вашем случае вы можете обойтись без объединения. Используйте спецификацию окна для разделения данных на «lable» и «year» и агрегирования в окне.

from pyspark.sql.window import *

windowSpec = Window.partitionBy('lable','year')\
                   .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = df.withColumn("std", f.round(f.stddev("val").over(windowSpec), 2))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...