Применить MinMaxScaler к нескольким столбцам в PySpark - PullRequest
1 голос
/ 18 февраля 2020

Я хочу применить MinMaxScalar PySpark к нескольким столбцам фрейма данных PySpark df. Пока я знаю, как применить его к одному столбцу, например, x.

from pyspark.ml.feature import MinMaxScaler

pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)

scaler = MinMaxScaler(inputCol="x", outputCol="x")
scalerModel = scaler.fit(df)
scaledData = scalerModel.transform(df)

Что если у меня 100 столбцов? Есть ли способ сделать минимальное-максимальное масштабирование для многих столбцов в PySpark?

Обновление:

Кроме того, как применить MinMaxScalar к целым или двойным значениям? Выдает следующую ошибку:

java.lang.IllegalArgumentException: requirement failed: Column length must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually int.

Ответы [ 2 ]

2 голосов
/ 18 февраля 2020

Вопрос 1:

Как изменить пример для правильной работы. Вам нужно подготовить данные как вектор для работы трансформаторов.

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorAssembler

pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)

assembler = VectorAssembler(inputCols=["x"], outputCol="x_vec")
scaler = MinMaxScaler(inputCol="x_vec", outputCol="x_scaled")
pipeline = Pipeline(stages=[assembler, scaler])
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)

Вопрос 2:

Чтобы запустить MinMaxScaler для нескольких столбцов, вы можете использовать конвейер, который получает список преобразований, подготовленный с использованием списка:

from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
columns_to_scale = ["x", "y", "z"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)

Проверьте этот пример конвейера в официальной документации.

В конце концов, вы закончите с результатами в этом формат:

>>> scaledData.printSchema() 
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)
 |-- z: long (nullable = true)
 |-- x_vec: vector (nullable = true)
 |-- y_vec: vector (nullable = true)
 |-- z_vec: vector (nullable = true)
 |-- x_scaled: vector (nullable = true)
 |-- y_scaled: vector (nullable = true)
 |-- z_scaled: vector (nullable = true)

>>> scaledData.show()
+---+---+----+-----+-----+--------+--------+--------+--------------------+
|  x|  y|   z|x_vec|y_vec|   z_vec|x_scaled|y_scaled|            z_scaled|
+---+---+----+-----+-----+--------+--------+--------+--------------------+
|  0|  1| 100|[0.0]|[1.0]| [100.0]|   [0.0]|   [0.0]|               [0.0]|
|  1|  2| 200|[1.0]|[2.0]| [200.0]|   [0.5]|  [0.25]|[0.1111111111111111]|
|  2|  5|1000|[2.0]|[5.0]|[1000.0]|   [1.0]|   [1.0]|               [1.0]|
+---+---+----+-----+-----+--------+--------+--------+--------------------+

Дополнительная постобработка:

Вы можете восстановить столбцы с их исходными именами с некоторой последующей обработкой. Например:

from pyspark.sql import functions as f
names = {x + "_scaled": x for x in columns_to_scale}
scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])

Вывод будет:

scaledData.show()
+------+-----+--------------------+
|     y|    x|                   z|
+------+-----+--------------------+
| [0.0]|[0.0]|               [0.0]|
|[0.25]|[0.5]|[0.1111111111111111]|
| [1.0]|[1.0]|               [1.0]|
+------+-----+--------------------+
0 голосов
/ 09 мая 2020

Вы можете использовать один экземпляр MinMaxScaler для набора векторов, а не создавать по одному MinMaxScaler на столбец, который вы хотите преобразовать (в данном случае масштабировать).

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler

#1. Your original dataset
#pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
#df = spark.createDataFrame(pdf)

df = spark.createDataFrame([(0, 10.0, 0.1), (1, 1.0, 0.20), (2, 1.0, 0.9)],["x", "y", "z"])

df.show()
+---+----+---+
|  x|   y|  z|
+---+----+---+
|  0|10.0|0.1|
|  1| 1.0|0.2|
|  2| 1.0|0.9|
+---+----+---+

#2. Vector assembled set of features 
# (assemble only the columns you want to MinMax Scale)
assembler = VectorAssembler(inputCols=["x", "y", "z"], 
outputCol="features")
output = assembler.transform(df)

output.show()

+---+----+---+--------------+
|  x|   y|  z|      features|
+---+----+---+--------------+
|  0|10.0|0.1|[0.0,10.0,0.1]|
|  1| 1.0|0.2| [1.0,1.0,0.2]|
|  2| 1.0|0.9| [2.0,1.0,0.9]|
+---+----+---+--------------+

#3. Applying MinMaxScaler to your assembled features 
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# rescale each feature to range [min, max].
scaledData = scaler.fit(output).transform(output)
scaledData.show()

+---+----+---+--------------+---------------+
|  x|   y|  z|      features| scaledFeatures|
+---+----+---+--------------+---------------+
|  0|10.0|0.1|[0.0,10.0,0.1]|  [0.0,1.0,0.0]|
|  1| 1.0|0.2| [1.0,1.0,0.2]|[0.5,0.0,0.125]|
|  2| 1.0|0.9| [2.0,1.0,0.9]|  [1.0,0.0,1.0]|
+---+----+---+--------------+---------------+

Надеюсь, это поможет.

...