Замените существующую Spark ML PipelineModel новыми данными - PullRequest
0 голосов
/ 12 ноября 2019

Я использую Spark Structured Streaming - более или менее - для сопоставления моих данных с DecisionTreeRegressor.

Я хотел бы повторно использовать уже установленную модель PipelineModel для повторного размещения в новых данных. Является ли это возможным? Я уже пытался загрузить обратно свою PipelineModel и добавить ее этапы в конвейер и подогнать данные к новой модели.

      val modelDirectory =  "/mnt/D834B3AF34B38ECE/DEV/hadoop/model"
      var model : PipelineModel = _
      var newModel : PipelineModel = _
      var pipeline : Pipeline = _

        ..........

      val trainingData = //an instance of a dataframne
      val testData = //an instance of a dataframne

      val assembler = new VectorAssembler()
        .setInputCols(Array("routeId", "stopId", "month","dayOfWeek","hour","temperature","humidity","pressure","rain","snow","visibility"))
        .setOutputCol("features")

      val dt = new DecisionTreeRegressor()
        .setLabelCol("value")
        .setFeaturesCol("features")
        .setImpurity("variance")
        .setMaxDepth(30)
        .setMaxBins(32)
        .setMinInstancesPerNode(5)

      pipeline = new Pipeline()

      try {
        model = PipelineModel.load(modelDirectory)
        pipeline.setStages(model.stages)
      } catch {
        case iie: InvalidInputException => {
          pipeline.setStages(Array(assembler,dt))
          printf(iie.getMessage)
        }
        case unknownError: UnknownError => {
          printf(unknownError.getMessage)
        }
      }

      newModel = pipeline.fit(trainingData)

      // Make predictions.
      val predictions: DataFrame = model.transform(testData)

      // Select example rows to display.
      print(s"Predictions based on ${System.currentTimeMillis()} time train: ${System.lineSeparator()}")
      predictions.show(10, false)

      // Select (prediction, true label) and compute test error
      val evaluator = new MulticlassClassificationEvaluator()
        .setLabelCol("value")
        .setPredictionCol("prediction")
        .setMetricName("accuracy")
      val accuracy = evaluator.evaluate(predictions)

Мой полный исходный код можно найти в: https://github.com/Hakuhun/bkk-data-process-spark/blob/master/src/main/scala/hu/oe/bakonyi/bkk/BkkDataDeserializer.scala

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...