DataFrame для создания DenseMatrix в искре с использованием Scala - PullRequest
0 голосов
/ 04 апреля 2019

Я пытаюсь преобразовать массив данных в плотную матрицу, используя scala. Я не смог найти никаких встроенных функций для этого, вот что я делаю.

import scala.util.Random
import breeze.linalg.DenseMatrix

val featuresDF = (1 to 10)
    .map(_ => (
      Random.nextDouble,Random.nextDouble,Random.nextDouble))
    .toDF("F1", "F2", "F3")

var FeatureArray: Array[Array[Double]] = Array.empty
val features = featuresDF.columns

for(i <- features.indices){
    FeatureArray = FeatureArray :+ featuresDF.select(features(i)).collect.map(_(0).toString).map(_.toDouble)
}

val desnseMat = DenseMatrix(FeatureArray: _*).t

Это работает нормально, и я получаю то, что хочу. Тем не менее, это вызывает исключения OOM в моей среде. Есть ли лучший способ сделать это преобразование. Моя конечная цель - вычислить собственные значения и собственные векторы объектов, используя плотную матрицу.

import breeze.stats.covmat
import breeze.linalg.eig

val covariance = covmat(desnseMat)
val eigen = eig(covariance)

Итак, было бы еще лучше, если бы существовал прямой способ получения собственных значений и собственных векторов из кадра данных. PCA в искровой мл должен выполнять этот расчет с использованием столбца функций. Есть ли способ получить доступ к собственным значениям через PCA?

1 Ответ

0 голосов
/ 30 июня 2019

Прежде всего, попробуйте увеличить вашу оперативную память.

Во-вторых, попробуйте одну из этих функций, используя DenseMatrix в Spark.Обе функции используют одинаковый объем оперативной памяти на моем компьютере.

Я получил 1,34 секунды для анализа строк 201238 в кадре данных с 1 столбцом, каждое из которых содержит несколько значений типа Double:

import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.DataFrame

def getDenseMatrixFromDF(featuresDF:DataFrame):DenseMatrix = {
    val featuresTrain = featuresDF.columns
    val rows = featuresDF.count().toInt

    val newFeatureArray:Array[Double] = featuresTrain
       .indices
       .flatMap(i => featuresDF
       .select(featuresTrain(i))
       .collect())
       .map(r => r.toSeq.toArray).toArray.flatten.flatMap(_.asInstanceOf[org.apache.spark.ml.linalg.DenseVector].values)

    val newCols = newFeatureArray.length / rows
    val denseMat:DenseMatrix = new DenseMatrix(rows, newCols, newFeatureArray, isTransposed=false)
    denseMat
}

ЕслиЯ хочу получить DenseVector из DataFrame с одним столбцом, содержащим только одно значение Double, я получил 0,8 секунды для того же объема данных:

import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.DataFrame

def getDenseVectorFromDF(featuresDF:DataFrame):DenseVector = {
    val featuresTrain = featuresDF.columns
    val cols = featuresDF.columns.length

    cols match {
      case i if i>1 => throw new IllegalArgumentException
      case _ => {
        def addArray(acc:Array[Array[Double]],cur:Array[Double]):Array[Array[Double]] = {
          acc :+ cur
        }

        val newFeatureArray:Array[Double] = featuresTrain
          .indices
          .flatMap(i => featuresDF
          .select(featuresTrain(i))
          .collect())
          .map(r => r.toSeq.toArray.map(e => e.asInstanceOf[Double])).toArray.flatten

        val denseVec:DenseVector = new DenseVector(newFeatureArray)
        denseVec
   }
}

Чтобы вычислить собственные значения / собственные векторы, просто проверьте эту ссылку и эта ссылка API

Для вычисления ковариационной матрицы chek эта ссылка и эта ссылка API

...