У меня есть фрейм данных со столбцом функций SparseVector.Мне нужно масштабировать каждую строку скаляром.У меня есть рабочая реализация ниже, которая использует UDF.Ниже показаны исходные и масштабированные столбцы объектов:
+-------------------+-------+-------------------+
| features|weights| scaledFeatures|
+-------------------+-------+-------------------+
|(6,[0,1],[0.5,1.0])| 1.0|(6,[0,1],[0.5,1.0])|
|(6,[2,3],[1.5,2.0])| 2.0|(6,[2,3],[3.0,4.0])|
|(6,[4,5],[0.5,1.0])| 3.0|(6,[4,5],[1.5,3.0])|
+-------------------+-------+-------------------+
Есть ли способ сделать это, используя собственные и оптимизированные методы Spark вместо UDF?
Аналогично, существует ли Spark-родной способ масштабирования SparseVector с помощью скаляра?См. Строку под комментарием «Масштабировать SparseVector» в пользовательской функции, определенной ниже.
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
// Scaling a SparseVector column
val data = Array(
(new SparseVector(6, Array(0,1), Array(0.5, 1.0)), 1.0),
(new SparseVector(6, Array(2,3), Array(1.5, 2.0)), 2.0),
(new SparseVector(6, Array(4,5), Array(0.5, 1.0)), 3.0)
)
val df = spark.createDataFrame(data).toDF("features", "weights")
val scaleUDF = udf((sv: SparseVector, w: Double) => {
// Scale the SparseVector
val unzipped = sv.indices.zip(sv.values).map(iv => (iv._1, iv._2*w)).unzip
new SparseVector(sv.size, unzipped._1, unzipped._2)
})
val scaledDF = df.withColumn("scaledFeatures", scaleUDF(col("features"), col("weights")))
scaledDF.show()
+-------------------+-------+-------------------+
| features|weights| scaledFeatures|
+-------------------+-------+-------------------+
|(6,[0,1],[0.5,1.0])| 1.0|(6,[0,1],[0.5,1.0])|
|(6,[2,3],[1.5,2.0])| 2.0|(6,[2,3],[3.0,4.0])|
|(6,[4,5],[0.5,1.0])| 3.0|(6,[4,5],[1.5,3.0])|
+-------------------+-------+-------------------+