Как инкрементально загрузить, согласовать с новыми данными, сохранить модель конвейера при использовании искры? - PullRequest
0 голосов
/ 20 апреля 2019

Любые указатели для поэтапного обучения и построения модели и получения прогноза для одного элемента.

Попытка запустить веб-приложение запишет данные в csv по общему пути, а приложение ml прочитает данные и загрузит модель, попытается подогнать данные и сохранить модель, преобразовать тестовые данные.(Это должно происходить в цикле)

Но при повторной загрузке сохраненной модели перед следующим исключением (для нормализации данных используется масштабатор minmax)

Исключение впоток "основной" java.lang.IllegalArgumentException: выходной столбец features_intermediate уже существует.

Любые указатели будут высоко оценены, спасибо

object RunAppPooling {

  def main(args: Array[String]): Unit = { // start the spark session
        val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
      .set("spark.broadcast.compress", "false")
      .setAppName("local-spark")

    val spark = SparkSession
      .builder()
      .config(conf)
      .getOrCreate()


    val filePath = "src/main/resources/train.csv"
    val modelPath = "file:///home/vagrant/custom.model"

    val schema = StructType(
      Array(
        StructField("IDLE_COUNT", IntegerType),
        StructField("TIMEOUTS", IntegerType),
        StructField("ACTIVE_COUNT", IntegerType),
        StructField("FACTOR_LOAD", DoubleType)))
   while(true){
    // read the raw data
    val df_raw = spark
      .read
      .option("header", "true")
      .schema(schema)
      .csv(filePath)

    df_raw.show()
    println(df_raw.count())
    // fill all na values with 0
    val df = df_raw.na.fill(0)
    df.printSchema()

    // create the feature vector
    val vectorAssembler = new VectorAssembler()
      .setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" ))
      .setOutputCol("features_intermediate")

    var lr1: PipelineModel = null
    try {
      lr1 = PipelineModel.load(modelPath)
    } catch {
      case ie: InvalidInputException => println(ie.getMessage)
    }

    import org.apache.spark.ml.feature.StandardScaler
    val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")

    var pipeline: Pipeline = null
    if (lr1 == null) {
      val lr =
        new LinearRegression()
          .setMaxIter(100)
          .setRegParam(0.1)
          .setElasticNetParam(0.8)
          .setLabelCol("FACTOR_LOAD") // setting label column
      // create the pipeline with the steps
      pipeline = new Pipeline().setStages(Array( vectorAssembler, scaler, lr))
    } else {
      pipeline = new Pipeline().setStages(Array(vectorAssembler, scaler, lr1))
    }

    // create the model following the pipeline steps
    val cvModel = pipeline.fit(df) 

    // save the model
    cvModel.write.overwrite.save(modelPath)

    var testschema = StructType(
      Array(
        StructField("PACKAGE_KEY", StringType),
       StructField("IDLE_COUNT", IntegerType),
        StructField("TIMEOUTS", IntegerType),
        StructField("ACTIVE_COUNT", IntegerType)
      ))

    val df_raw1 = spark
      .read
      .option("header", "true")
      .schema(testschema)
      .csv("src/main/resources/test_pooling.csv")

    // fill all na values with 0
    val df1 = df_raw1.na.fill(0)
    val extracted = cvModel.transform(df1) //.toDF("prediction")
    import org.apache.spark.sql.functions._
    val test = extracted.select(mean(df("FACTOR_LOAD"))).collect()
    println(test.apply(0))
}
  }

}

1 Ответ

0 голосов
/ 21 апреля 2019

Я придумал способ, по крайней мере, обойти исключение, не зная, правильно ли это выполняется или нет. Здесь он идет при создании конвейера после загрузки модели, задайте этапы как только модель, потому что модель имеет уже определено с соответствующей схемой. Не уверен, нормализует ли это новые данные или нет.

  pipeline = new Pipeline().setStages(Array( lr1))
...