Переписать Apache Spark Pipeline, чтобы использовать существующую модель - PullRequest
0 голосов
/ 08 июля 2019

У меня есть конвейер (см. PipeBefore), который:

  • Предварительная обработка данных
  • Тренирует модель
  • Получает прогноз

Тогда я делегировал обучение моделям, и теперь мне нужно только предварительно обработать данные и получить результат прогнозирования. Смотрите конвейерПосле

Как я могу реорганизовать код для использования существующей модели через API Pipeline вместо того, чтобы вызывать трансформаторы вручную?

Разъяснение. Мне нужно интегрировать простую модель, например, org.apache.spark.ml.classification.LogisticRegression, а не ранее обученный org.apache.spark.ml.PipelineModel


    private def pipelineBefore: org.apache.spark.sql.DataFrame = {
      val training = spark.createDataFrame(Seq(
        (0L, "a b c d e spark", 1.0),
        (1L, "b d", 0.0),
        (2L, "spark f g h", 1.0),
        (3L, "hadoop mapreduce", 0.0)
      )).toDF("id", "text", "label")
      println("Pipeline example. Training dataframe before preprocessing")
      training.show()
      // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
      val tokenizer = new Tokenizer()
        .setInputCol("text")
        .setOutputCol("words")
      val hashingTF = new HashingTF()
        .setNumFeatures(1000)
        .setInputCol(tokenizer.getOutputCol)
        .setOutputCol("features")
      val lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.001)
      val pipeline = new Pipeline()
        .setStages(Array(tokenizer, hashingTF, lr))
      // Fit the pipeline to training documents.
      val model = pipeline.fit(training)
      // Prepare test documents, which are unlabeled (id, text) tuples.
      val test = spark.createDataFrame(Seq(
        (4L, "spark i j k"),
        (5L, "l m n"),
        (6L, "spark hadoop spark"),
        (7L, "apache hadoop")
      )).toDF("id", "text")
      // Make predictions on test documents.
      val predictionResult = model.transform(test)
      println("Pipeline example. Prediction result")
      predictionResult.show()
      return predictionResult
    }

    private def pipelineAfter: org.apache.spark.sql.DataFrame = {
      // Given a valid model trained on a preprocessed DataFrame
      val trainedModel = getTrainedModel()
      // Preprocess a test dataset
      val test = spark.createDataFrame(Seq(
        (4L, "spark i j k"),
        (5L, "l m n"),
        (6L, "spark hadoop spark"),
        (7L, "apache hadoop")
      )).toDF("id", "text")
      //HOW TO ADOPT A PIPELINE API HERE ?
      val tokenizer = new Tokenizer()
        .setInputCol("text")
        .setOutputCol("words")
      val hashingTF = new HashingTF()
        .setNumFeatures(1000)
        .setInputCol(tokenizer.getOutputCol)
        .setOutputCol("features")
      val tokenizedTestData = tokenizer.transform(test)
      val hashedTestData = hashingTF.transform(tokenizedTestData)
      println("Preprocessed test data")
      hashedTestData.show()
      // Make predictions on the test dataset.
      val predictionResult = trainedModel.transform(hashedTestData)
      println("Prediction result")
      predictionResult.show()
      return predictionResult
    }

1 Ответ

0 голосов
/ 08 июля 2019

Вам нужно сериализовать ваш конвейер, если вы хотите использовать последний с другой моделью. В вашем примере:

private def pipelineBefore: org.apache.spark.sql.DataFrame = {
    val training = spark.createDataFrame(Seq(
      (0L, "a b c d e spark", 1.0),
      (1L, "b d", 0.0),
      (2L, "spark f g h", 1.0),
      (3L, "hadoop mapreduce", 0.0)
    )).toDF("id", "text", "label")
    println("Pipeline example. Training dataframe before preprocessing")
    training.show()
    // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
    val tokenizer = new Tokenizer()
      .setInputCol("text")
      .setOutputCol("words")
    val hashingTF = new HashingTF()
      .setNumFeatures(1000)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol("features")
    val lr = new LogisticRegression()
      .setMaxIter(10)
      .setRegParam(0.001)
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, hashingTF, lr))
    // Fit the pipeline to training documents.


    // Save your pipeline transformations
    pipeline.write.overwrite().save("/tmp/path")

    // ....
}

Тогда вам нужно загрузить:

  private def pipelineAfter: org.apache.spark.sql.DataFrame = {
    // Given a valid model trained, for example a LR model
    // You can use pipeline model to load your model too
    val trainedModel : LogisticRegressionModel = ???
    // val trainedModel = PipelineModel.load("path_to_your_model")

    // Preprocess a test dataset
    val test = spark.createDataFrame(Seq(
      (4L, "spark i j k"),
      (5L, "l m n"),
      (6L, "spark hadoop spark"),
      (7L, "apache hadoop")
    )).toDF("id", "text")
    //HOW TO ADOPT A PIPELINE API HERE ?

    // Path where you stored the transform pipeline
    val transformPipeline = PipelineModel.load("/tmp/path")
    val hashedTestData = transformPipeline.transform(test)

    // Make predictions on the test dataset.
    val predictionResult = trainedModel.transform(hashedTestData)
    println("Prediction result")
    predictionResult.show()
    return predictionResult
  }

Проверьте Spark doc , чтобы увидеть более подробную информацию об этом.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...