Конвертируйте фрейм данных в матрицу Spark MLLIB в Scala - PullRequest
0 голосов
/ 05 июля 2018

У меня есть кадр данных Spark с именем df в качестве ввода:

+---------------+---+---+---+---+
|Main_CustomerID| A1| A2| A3| A4|
+---------------+---+---+---+---+
|            101|  1|  0|  2|  1|
|            102|  0|  3|  1|  1|
|            103|  2|  1|  0|  0|
+---------------+---+---+---+---+

Мне нужно собрать значения A1, A2, A3, A4 в матрицу из нескольких миллиметров, например,

dm: org.apache.spark.mllib.linalg.Matrix =
1.0  0.0  2.0  1.0
0.0  3.0  1.0  1.0
2.0  1.0  0.0  0.0

Как мне добиться этого в Scala?

1 Ответ

0 голосов
/ 05 июля 2018

Вы можете сделать это следующим образом, сначала получите все столбцы, которые должны быть включены в матрицу:

import org.apache.spark.sql.functions._

val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))

Затем преобразуйте кадр данных в RDD[Vector]. Поскольку вектор должен содержать двойные числа, это преобразование должно быть выполнено и здесь.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

val rdd = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]].rdd
  .zipWithIndex()
  .map{ case(arr, index) => IndexedRow(index, Vectors.dense(arr.map(_.toDouble)))}

Затем преобразуйте rdd в IndexedRowMatrix, который при необходимости можно преобразовать в локальную матрицу:

val dm = new IndexedRowMatrix(rdd).toBlockMatrix().toLocalMatrix()

Для меньших матриц , которые можно собрать в драйвер, есть более простая альтернатива:

val matrixColumns = df.columns.filter(_.startsWith("A")).map(col(_))

val arr = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]]
  .collect()
  .flatten
  .map(_.toDouble)

val rows = df.count().toInt
val cols = matrixColumns.length

// It's necessary to reverse cols and rows here and then transpose
val dm = Matrices.dense(cols, rows, arr).transpose()
...