Как оценить производительность модели (точность) в Spark Pipeline с линейной регрессией - PullRequest
0 голосов
/ 12 марта 2020

Пытаясь запустить Spark Pipeline с линейной регрессией, я смог выполнить модель и искал

  1. Чтобы найти эффективность модели и другие метрики, для которых мне нужно резюме модели Я нашел Python пример, который я прокомментировал ниже для справки.
       import org.apache.spark.ml.feature.VectorAssembler
       import spark.implicits._
       import org.apache.spark.sql
       import org.apache.spark.sql.functions._
       import org.apache.spark.sql.types.DecimalType
       import org.apache.spark.sql.{Dataset, Row, SparkSession}
       import org.apache.spark.ml.regression.LinearRegression
       import org.apache.spark.ml.feature.OneHotEncoderEstimator
       import org.apache.spark.ml.{Pipeline, PipelineModel}    

       val splitDF: Array[Dataset[Row]] = inputDF.randomSplit(Array(0.5, 0.5))
        val trainingDF = splitDF(0)
        val testingDF = splitDF(1) 


        val encoder = new OneHotEncoderEstimator()
          .setInputCols(Array("_LookUpID"))
          .setOutputCols(Array("_LookUpID_Encoded"))

        val requiredFeatures = Array("_LookUpID_Encoded","VALUE1")
        val assembler = new VectorAssembler()
          .setInputCols(requiredFeatures)
          .setOutputCol("features")


        val lr = new LinearRegression()
          .setMaxIter(10)
          .setRegParam(0.3)
          .setElasticNetParam(0.8)
          .setFeaturesCol("features")
          .setLabelCol("VALUE2")

        // Fit the model
        val pipeline = new Pipeline()
          .setStages(Array(encoder, assembler, lr))

        // Fit the pipeline to training documents.
        val lrModel = pipeline.fit(trainingDF)

        val predictions = lrModel.transform(testingDF)
        println("*** Predictions ***")
        predictions.printSchema()  

predictions.select("VALUE_DATE","_LookUpID","_CD","VALUE1","VALUE2","prediction").show(100)

        val rm = new RegressionMetrics(predictions.rdd.map(x => (x(4).asInstanceOf[Double], x(5).asInstanceOf[Double])))
        println("sqrt(MSE): " + Math.sqrt(rm.meanSquaredError))
        println("R Squared: " + rm.r2)
        println("Explained Variance: " + rm.explainedVariance + "\n")

Проглатывание перегородками

def getDataFrame(sql: String, lowerNumber: Int, upperNumber: Int): DataFrame = {
 val inputDF: DataFrame = 
 spark.read.format(source = "jdbc")
  .option("url", "jdbc:oracle:thin:@//url")
        .option("user", "user")
        .option("password", "password")
        .option("driver", "oracle.jdbc.OracleDriver")
        .option("dbtable", s"($sql)")
        .option("partitionColumn", "_LookUpID")
        .option("numPartitions", "6")
        .option("lowerBound", lowerNumber)
        .option("upperBound", upperNumber)
        .load()
 inputDF
}
Следующей линии не хватает памяти (java .lang.OutOfMemoryError: Java пространство кучи в ...), если я передаю набор данных с 1 миллионами строк (работает нормально при 100 КБ), даже если заданию выделено 32 ГБ Память. Пробовал .cache () inputDF без особого успеха. Это из-за кодировки _LookUpID, что еще я могу сделать по-другому Обновление : увеличил объем памяти кучи в драйвере вместе с количеством разделов и смог ее решить.

Спасибо

1 Ответ

0 голосов
/ 16 марта 2020

Обновлен вопрос с с RegressionMetrics для выборки RMSE и R Squared et c для метрик

Секционированный набор данных и увеличена куча памяти для драйвера, который на данный момент решает проблемы с памятью. Будет продолжать мониторинг

...