Вы можете сделать это следующим образом, сначала получите все столбцы, которые должны быть включены в матрицу:
import org.apache.spark.sql.functions._
val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))
Затем преобразуйте кадр данных в RDD[Vector]
. Поскольку вектор должен содержать двойные числа, это преобразование должно быть выполнено и здесь.
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
val rdd = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]].rdd
.zipWithIndex()
.map{ case(arr, index) => IndexedRow(index, Vectors.dense(arr.map(_.toDouble)))}
Затем преобразуйте rdd в IndexedRowMatrix
, который при необходимости можно преобразовать в локальную матрицу:
val dm = new IndexedRowMatrix(rdd).toBlockMatrix().toLocalMatrix()
Для меньших матриц , которые можно собрать в драйвер, есть более простая альтернатива:
val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))
val arr = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]]
.collect()
.flatten
.map(_.toDouble)
val rows = df.count().toInt
val cols = matrixColumns.length
// It's necessary to reverse cols and rows here and then transpose
val dm = Matrices.dense(cols, rows, arr).transpose()