Пытаясь запустить Spark Pipeline с линейной регрессией, я смог выполнить модель и искал
- Чтобы найти эффективность модели и другие метрики, для которых мне нужно резюме модели Я нашел Python пример, который я прокомментировал ниже для справки.
import org.apache.spark.ml.feature.VectorAssembler
import spark.implicits._
import org.apache.spark.sql
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.{Pipeline, PipelineModel}
val splitDF: Array[Dataset[Row]] = inputDF.randomSplit(Array(0.5, 0.5))
val trainingDF = splitDF(0)
val testingDF = splitDF(1)
val encoder = new OneHotEncoderEstimator()
.setInputCols(Array("_LookUpID"))
.setOutputCols(Array("_LookUpID_Encoded"))
val requiredFeatures = Array("_LookUpID_Encoded","VALUE1")
val assembler = new VectorAssembler()
.setInputCols(requiredFeatures)
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
.setFeaturesCol("features")
.setLabelCol("VALUE2")
// Fit the model
val pipeline = new Pipeline()
.setStages(Array(encoder, assembler, lr))
// Fit the pipeline to training documents.
val lrModel = pipeline.fit(trainingDF)
val predictions = lrModel.transform(testingDF)
println("*** Predictions ***")
predictions.printSchema()
predictions.select("VALUE_DATE","_LookUpID","_CD","VALUE1","VALUE2","prediction").show(100)
val rm = new RegressionMetrics(predictions.rdd.map(x => (x(4).asInstanceOf[Double], x(5).asInstanceOf[Double])))
println("sqrt(MSE): " + Math.sqrt(rm.meanSquaredError))
println("R Squared: " + rm.r2)
println("Explained Variance: " + rm.explainedVariance + "\n")
Проглатывание перегородками
def getDataFrame(sql: String, lowerNumber: Int, upperNumber: Int): DataFrame = {
val inputDF: DataFrame =
spark.read.format(source = "jdbc")
.option("url", "jdbc:oracle:thin:@//url")
.option("user", "user")
.option("password", "password")
.option("driver", "oracle.jdbc.OracleDriver")
.option("dbtable", s"($sql)")
.option("partitionColumn", "_LookUpID")
.option("numPartitions", "6")
.option("lowerBound", lowerNumber)
.option("upperBound", upperNumber)
.load()
inputDF
}
Следующей линии не хватает памяти (java .lang.OutOfMemoryError: Java пространство кучи в ...), если я передаю набор данных с 1 миллионами строк (работает нормально при 100 КБ), даже если заданию выделено 32 ГБ Память. Пробовал .cache () inputDF без особого успеха. Это из-за кодировки _LookUpID, что еще я могу сделать по-другому
Обновление : увеличил объем памяти кучи в драйвере вместе с количеством разделов и смог ее решить.
Спасибо