Spark: Как эффективно нормализовать все столбцы DataFrame? - PullRequest
0 голосов
/ 13 июля 2020

Я хочу нормализовать все столбцы DataFrame. Я использую следующий метод.

from pyspark.sql.functions import stddev_pop, avg, broadcast, mean, stddev
def normalize(df, columns, select_col=[]):
    aggMean = []
    aggStd = []
    for column in columns:
        aggMean.append(mean(df[column]).alias(column))
        aggStd.append(stddev_pop(df[column]).alias(column))
    averages = df.agg(*aggMean).collect()[0]
    stds = df.agg(*aggStd).collect()[0]
    for column in columns:
        df = df.withColumn(column + "_norm", ((df[column] - averages[column]) / stds[column]))
        select_col.append(column + "_norm")
    df = df.select(select_col)
    return df
normalize(train_df,train_df.columns)

Однако он слишком медленный, когда DataFrame содержит около 10,000 столбцов. Как лучше всего нормализовать такой DataFrame.

Для удобства обсуждения я привожу пример DataFrame,

df = spark.createDataFrame([(1,2,3,4,5,6,7,8,9,10),(2,3,4,5,6,7,8,9,10,11)],
["a","b","c","d","e","f","g","h","i","j"])

1 Ответ

3 голосов
/ 13 июля 2020

Лучше всего использовать масштабатор MinMax в Spark ml: http://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html?highlight=minmaxscaler#pyspark .ml.feature.MinMaxScaler Итак, вам нужно преобразовать столбцы в вектор, масштабировать minmax, преобразовать в массив и, если необходимо, вернуться к отдельным столбцам

import pyspark.sql.functions as F
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.feature import VectorAssembler,MinMaxScaler
from pyspark.sql.types import *
#%%
df = sqlContext.createDataFrame([(1,2,3,4,5,6,7,8,9,10),(2,3,4,5,6,7,8,9,10,11)],schema=["a","b","c","d","e","f","g","h","i","j"])
vecAssembler = VectorAssembler(inputCols=df.columns, outputCol="features",handleInvalid='skip')
normalizer = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1)
pipeline_normalize = Pipeline(stages=[vecAssembler,normalizer])
df_transf = pipeline_normalize.fit(df).transform(df)
#%%
to_array = F.udf(lambda x: (x.toArray().tolist()), ArrayType(DoubleType())) # IF spark 3.0.0 then a api - vectortoarray available
df_array = df_transf.withColumn("scaledFeatures",to_array("scaledFeatures"))
#%%\
df_final= df_array.select([F.col('scaledFeatures')[i].alias(df.columns[i]) for i in range(len(df.columns))])

Примечание: это масштабатор, а не нормализатор. Если вы хотите, чтобы это соответствовало вашей формуле, используйте: http://spark.apache.org/docs/2.4.0/api/python/pyspark.ml.html?highlight=minmaxscaler#pyspark .ml.feature.StandardScaler вместо minmaxscaler

...