Вопрос 1:
Как изменить пример для правильной работы. Вам нужно подготовить данные как вектор для работы трансформаторов.
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import VectorAssembler
pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)
assembler = VectorAssembler(inputCols=["x"], outputCol="x_vec")
scaler = MinMaxScaler(inputCol="x_vec", outputCol="x_scaled")
pipeline = Pipeline(stages=[assembler, scaler])
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
Вопрос 2:
Чтобы запустить MinMaxScaler для нескольких столбцов, вы можете использовать конвейер, который получает список преобразований, подготовленный с использованием списка:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
columns_to_scale = ["x", "y", "z"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
Проверьте этот пример конвейера в официальной документации.
В конце концов, вы закончите с результатами в этом формат:
>>> scaledData.printSchema()
root
|-- x: long (nullable = true)
|-- y: long (nullable = true)
|-- z: long (nullable = true)
|-- x_vec: vector (nullable = true)
|-- y_vec: vector (nullable = true)
|-- z_vec: vector (nullable = true)
|-- x_scaled: vector (nullable = true)
|-- y_scaled: vector (nullable = true)
|-- z_scaled: vector (nullable = true)
>>> scaledData.show()
+---+---+----+-----+-----+--------+--------+--------+--------------------+
| x| y| z|x_vec|y_vec| z_vec|x_scaled|y_scaled| z_scaled|
+---+---+----+-----+-----+--------+--------+--------+--------------------+
| 0| 1| 100|[0.0]|[1.0]| [100.0]| [0.0]| [0.0]| [0.0]|
| 1| 2| 200|[1.0]|[2.0]| [200.0]| [0.5]| [0.25]|[0.1111111111111111]|
| 2| 5|1000|[2.0]|[5.0]|[1000.0]| [1.0]| [1.0]| [1.0]|
+---+---+----+-----+-----+--------+--------+--------+--------------------+
Дополнительная постобработка:
Вы можете восстановить столбцы с их исходными именами с некоторой последующей обработкой. Например:
from pyspark.sql import functions as f
names = {x + "_scaled": x for x in columns_to_scale}
scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])
Вывод будет:
scaledData.show()
+------+-----+--------------------+
| y| x| z|
+------+-----+--------------------+
| [0.0]|[0.0]| [0.0]|
|[0.25]|[0.5]|[0.1111111111111111]|
| [1.0]|[1.0]| [1.0]|
+------+-----+--------------------+