У меня есть такой фрейм данных:
val df = Seq(
("a", Seq(2.0)),
("a", Seq(1.0)),
("a", Seq(0.5)),
("b", Seq(24.0)),
("b", Seq(12.5)),
("b", Seq(6.4)),
("b", Seq(3.2)),
("c", Seq(104.0)),
("c", Seq(107.4))
).toDF("key", "value")
Мне нужно использовать алгоритм, который принимает входные данные объекта DataFrame в разных группах.Чтобы сделать это более понятным, предположим, что я должен использовать StandardScaler для масштабирования по группам.
В пандах я хотел бы сделать что-то подобное (много типов изменений в процессе):
from sklearn.preprocessing import StandardScaler
df.groupby(key) \
.value \
.transform(lambda x: StandardScaler \
.fit_transform(x \
.values \
.reshape(-1,1)) \
.reshape(-1))
Мне нужносделать это в Scala, потому что алгоритм, который мне нужен, это не Scaler, а еще одна вещь, встроенная в Scala.
До сих пор я пытался сделать что-то вроде этого:
import org.apache.spark.ml.feature.StandardScaler
def f(X : org.apache.spark.sql.Column) : org.apache.spark.sql.Column = {
val scaler = new StandardScaler()
.setInputCol("value")
.setOutputCol("scaled")
val output = scaler.fit(X)("scaled")
(output)
}
df.withColumn("scaled_values", f(col("features")).over(Window.partitionBy("key")))
но, конечно, это дает мне ошибку:
command-144174313464261: 21: ошибка: несоответствие типов;найдено: org.apache.spark.sql.Column требуется: org.apache.spark.sql.Dataset [_] val output = scaler.fit (X) ("масштабированный")
Итак, япытаюсь превратить один объект Column в объект DataFrame, но безуспешно.Как мне это сделать?
Если это невозможно, есть ли способ решить эту проблему?
ОБНОВЛЕНИЕ 1
Кажется, я сделал некоторыеошибки в коде, я пытался это исправить (я думаю, что сделал правильно):
val df = Seq(
("a", 2.0),
("a", 1.0),
("a", 0.5),
("b", 24.0),
("b", 12.5),
("b", 6.4),
("b", 3.2),
("c", 104.0),
("c", 107.4)
).toDF("key", "value")
def f(X : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.Column = {
val assembler = new VectorAssembler()
.setInputCols(Array("value"))
.setOutputCol("feature")
val scaler = new StandardScaler()
.setInputCol("feature")
.setOutputCol("scaled")
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler))
val output = pipeline.fit(X).transform(X)("scaled")
(output)
}
someDF.withColumn("scaled_values", f(someDF).over(Window.partitionBy("key")))
Я все еще получаю ошибку:
org.apache.spark.sql.AnalysisException: выражение 'scaled # 1294' не поддерживается внутри оконной функции. ;;
Я не уверен в причине этой ошибки, я попытался наложить псевдоним на столбец, но он не работает.