У меня есть конвейер (см. PipeBefore), который:
- Предварительная обработка данных
- Тренирует модель
- Получает прогноз
Тогда я делегировал обучение моделям, и теперь мне нужно только предварительно обработать данные и получить результат прогнозирования. Смотрите конвейерПосле
Как я могу реорганизовать код для использования существующей модели через API Pipeline вместо того, чтобы вызывать трансформаторы вручную?
Разъяснение. Мне нужно интегрировать простую модель, например, org.apache.spark.ml.classification.LogisticRegression, а не ранее обученный org.apache.spark.ml.PipelineModel
private def pipelineBefore: org.apache.spark.sql.DataFrame = {
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
println("Pipeline example. Training dataframe before preprocessing")
training.show()
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
val predictionResult = model.transform(test)
println("Pipeline example. Prediction result")
predictionResult.show()
return predictionResult
}
private def pipelineAfter: org.apache.spark.sql.DataFrame = {
// Given a valid model trained on a preprocessed DataFrame
val trainedModel = getTrainedModel()
// Preprocess a test dataset
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
//HOW TO ADOPT A PIPELINE API HERE ?
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val tokenizedTestData = tokenizer.transform(test)
val hashedTestData = hashingTF.transform(tokenizedTestData)
println("Preprocessed test data")
hashedTestData.show()
// Make predictions on the test dataset.
val predictionResult = trainedModel.transform(hashedTestData)
println("Prediction result")
predictionResult.show()
return predictionResult
}