Мой код использует класс StreamingKMeans
для кластеризации потоковых данных из kafka.Я хочу вычислить ошибку модели (WSSSE), но у класса StreamingKMeansModel
есть эта функция computeCost(RDD<Vector> data)
.Ввод RDD, а не DStream.Я не могу применить эту функцию к DStream.Я решил проблему следующим образом:
trainingData.saveAsTextFiles("/home/hduser/sbt_project/project1/Dstream/")
val rddTraining= ssc.sparkContext.textFile("/home/hduser/sbt_project/project1/Dstream/")
val vectorTraining=rddTraining.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
val WSSSE=model.latestModel().computeCost(vectorTraining)
Но saveAsTextFiles
создайте одну папку для каждого потока и textFile
считывайте из файла, а не из папки.
Могу ли я решить эту проблему?Могу ли я добавить поток в файл?