Любые указатели для поэтапного обучения и построения модели и получения прогноза для одного элемента.
Попытка запустить веб-приложение запишет данные в csv по общему пути, а приложение ml прочитает данные и загрузит модель, попытается подогнать данные и сохранить модель, преобразовать тестовые данные.(Это должно происходить в цикле)
Но при повторной загрузке сохраненной модели перед следующим исключением (для нормализации данных используется масштабатор minmax)
Исключение впоток "основной" java.lang.IllegalArgumentException: выходной столбец features_intermediate уже существует.
Любые указатели будут высоко оценены, спасибо
object RunAppPooling {
def main(args: Array[String]): Unit = { // start the spark session
val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
.set("spark.broadcast.compress", "false")
.setAppName("local-spark")
val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
val filePath = "src/main/resources/train.csv"
val modelPath = "file:///home/vagrant/custom.model"
val schema = StructType(
Array(
StructField("IDLE_COUNT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("ACTIVE_COUNT", IntegerType),
StructField("FACTOR_LOAD", DoubleType)))
while(true){
// read the raw data
val df_raw = spark
.read
.option("header", "true")
.schema(schema)
.csv(filePath)
df_raw.show()
println(df_raw.count())
// fill all na values with 0
val df = df_raw.na.fill(0)
df.printSchema()
// create the feature vector
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" ))
.setOutputCol("features_intermediate")
var lr1: PipelineModel = null
try {
lr1 = PipelineModel.load(modelPath)
} catch {
case ie: InvalidInputException => println(ie.getMessage)
}
import org.apache.spark.ml.feature.StandardScaler
val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")
var pipeline: Pipeline = null
if (lr1 == null) {
val lr =
new LinearRegression()
.setMaxIter(100)
.setRegParam(0.1)
.setElasticNetParam(0.8)
.setLabelCol("FACTOR_LOAD") // setting label column
// create the pipeline with the steps
pipeline = new Pipeline().setStages(Array( vectorAssembler, scaler, lr))
} else {
pipeline = new Pipeline().setStages(Array(vectorAssembler, scaler, lr1))
}
// create the model following the pipeline steps
val cvModel = pipeline.fit(df)
// save the model
cvModel.write.overwrite.save(modelPath)
var testschema = StructType(
Array(
StructField("PACKAGE_KEY", StringType),
StructField("IDLE_COUNT", IntegerType),
StructField("TIMEOUTS", IntegerType),
StructField("ACTIVE_COUNT", IntegerType)
))
val df_raw1 = spark
.read
.option("header", "true")
.schema(testschema)
.csv("src/main/resources/test_pooling.csv")
// fill all na values with 0
val df1 = df_raw1.na.fill(0)
val extracted = cvModel.transform(df1) //.toDF("prediction")
import org.apache.spark.sql.functions._
val test = extracted.select(mean(df("FACTOR_LOAD"))).collect()
println(test.apply(0))
}
}
}