Spark Task не сериализуем (Array [Vector]) - PullRequest
0 голосов
/ 17 мая 2018

Я новичок в Spark и изучаю книгу "Расширенная аналитика с Spark".Код взят из примеров в книге.Когда я пытаюсь запустить следующий код, я получаю исключение Spark Task not serializable.

val kMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
val centroids: Array[Vector] = kMeansModel.clusterCenters

val clustered = pipelineModel.transform(data)

val threshold = clustered.
      select("cluster", "scaledFeatureVector").as[(Int, Vector)].
      map { case (cluster, vec) => Vectors.sqdist(centroids(cluster), vec) }.
      orderBy($"value".desc).take(100).last

Кроме того, вот как я строю модель:

def oneHotPipeline(inputCol: String): (Pipeline, String) = {
    val indexer = new StringIndexer()
        .setInputCol(inputCol)
        .setOutputCol(inputCol + "_indexed")

    val encoder = new OneHotEncoder()
        .setInputCol(inputCol + "_indexed")
        .setOutputCol(inputCol + "_vec")

    val pipeline = new Pipeline()
        .setStages(Array(indexer, encoder))

    (pipeline, inputCol + "_vec")
}

val k = 180

val (protoTypeEncoder, protoTypeVecCol) = oneHotPipeline("protocol_type")
val (serviceEncoder, serviceVecCol) = oneHotPipeline("service")
val (flagEncoder, flagVecCol) = oneHotPipeline("flag")

// Original columns, without label / string columns, but with new vector encoded cols
val assembleCols = Set(data.columns: _*) --
  Seq("label", "protocol_type", "service", "flag") ++
  Seq(protoTypeVecCol, serviceVecCol, flagVecCol)
val assembler = new VectorAssembler().
  setInputCols(assembleCols.toArray).
  setOutputCol("featureVector")

val scaler = new StandardScaler()
  .setInputCol("featureVector")
  .setOutputCol("scaledFeatureVector")
  .setWithStd(true)
  .setWithMean(false)

val kmeans = new KMeans().
  setSeed(Random.nextLong()).
  setK(k).
  setPredictionCol("cluster").
  setFeaturesCol("scaledFeatureVector").
  setMaxIter(40).
  setTol(1.0e-5)

val pipeline = new Pipeline().setStages(
  Array(protoTypeEncoder, serviceEncoder, flagEncoder, assembler, scaler, kmeans))
val pipelineModel = pipeline.fit(data) 

Я предполагаю, что проблема со строкой Vectors.sqdist(centroids(cluster), vec).По какой-то причине я не могу использовать centroids в моих расчетах Spark.Я провел некоторое поиск в Google, и я знаю, что эта ошибка возникает, когда «я инициализирую переменную на главном сервере, но затем пытаюсь использовать ее на рабочих», которая в моем случае равна centroids.Однако я не знаю, как решить эту проблему.

В случае, если вас заинтересовало здесь - это полный код этого учебника в книге. здесь - ссылка на набор данных, который используется в учебном пособии.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...