Я новичок в Spark и изучаю книгу "Расширенная аналитика с Spark".Код взят из примеров в книге.Когда я пытаюсь запустить следующий код, я получаю исключение Spark Task not serializable
.
val kMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
val centroids: Array[Vector] = kMeansModel.clusterCenters
val clustered = pipelineModel.transform(data)
val threshold = clustered.
select("cluster", "scaledFeatureVector").as[(Int, Vector)].
map { case (cluster, vec) => Vectors.sqdist(centroids(cluster), vec) }.
orderBy($"value".desc).take(100).last
Кроме того, вот как я строю модель:
def oneHotPipeline(inputCol: String): (Pipeline, String) = {
val indexer = new StringIndexer()
.setInputCol(inputCol)
.setOutputCol(inputCol + "_indexed")
val encoder = new OneHotEncoder()
.setInputCol(inputCol + "_indexed")
.setOutputCol(inputCol + "_vec")
val pipeline = new Pipeline()
.setStages(Array(indexer, encoder))
(pipeline, inputCol + "_vec")
}
val k = 180
val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
val (flagEncoder, flagVecCol) = oneHotPipeline("flag")
// Original columns, without label / string columns, but with new vector encoded cols
val assembleCols = Set(data.columns: _*) --
Seq("label", "protocol_type", "service", "flag") ++
Seq(protoTypeVecCol, serviceVecCol, flagVecCol)
val assembler = new VectorAssembler().
setInputCols(assembleCols.toArray).
setOutputCol("featureVector")
val scaler = new StandardScaler()
.setInputCol("featureVector")
.setOutputCol("scaledFeatureVector")
.setWithStd(true)
.setWithMean(false)
val kmeans = new KMeans().
setSeed(Random.nextLong()).
setK(k).
setPredictionCol("cluster").
setFeaturesCol("scaledFeatureVector").
setMaxIter(40).
setTol(1.0e-5)
val pipeline = new Pipeline().setStages(
Array(protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans))
val pipelineModel = pipeline.fit(data)
Я предполагаю, что проблема со строкой Vectors.sqdist(centroids(cluster), vec)
.По какой-то причине я не могу использовать centroids
в моих расчетах Spark.Я провел некоторое поиск в Google, и я знаю, что эта ошибка возникает, когда «я инициализирую переменную на главном сервере, но затем пытаюсь использовать ее на рабочих», которая в моем случае равна centroids
.Однако я не знаю, как решить эту проблему.
В случае, если вас заинтересовало здесь - это полный код этого учебника в книге. здесь - ссылка на набор данных, который используется в учебном пособии.