Я пишу наивную реализацию Kmeans в Spark для своей домашней работы:
import breeze.linalg.{ Vector, DenseVector, squaredDistance }
import scala.math
def parse(line: String): Vector[Double] = {
DenseVector(line.split(' ').map(_.toDouble))
}
def closest_assign(p: Vector[Double], centres: Array[Vector[Double]]): Int = {
var bestIndex = 1
var closest = Double.PositiveInfinity
for (i <- 0 until centres.length) {
val tempDist = squaredDistance(p, centres(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}
bestIndex
}
val fileroot:String="/FileStore/tables/"
val file=sc.textFile(fileroot+"data.txt")
.map(parse _)
.cache()
val c1=sc.textFile(fileroot+"c1.txt")
.map(parse _)
.collect()
val c2=sc.textFile(fileroot+"c2.txt")
.map(parse _)
.collect()
val K=10
val MAX_ITER=20
var kPoints=c2
for(i<-0 until MAX_ITER){
val closest = file.map(p => (closest_assign(p, kPoints), (p, 1)))
val pointStats = closest.reduceByKey { case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }
val newPoints = pointStats.map { pair =>
(pair._1, pair._2._1 * (1.0 / pair._2._2))
}.collectAsMap()
for (newP <- newPoints) {
kPoints(newP._1) = newP._2
}
val tempDist = closest
.map { x => squaredDistance(x._2._1, newPoints(x._1)) }
.fold(0) { _ + _ }
println(i+" time finished iteration (cost = " + tempDist + ")")
}
В теории tempDist
должно становиться все меньше и меньше по мере выполнения программы, но в действительности все идет наоборот.Также я нашел значения c1
и c2
после цикла for(i<-0 until MAX_ITER)
.Но c1
и c2
должны быть val
значениями!Я неправильно загружаю c1
и c2
?c1
и c2
- два разных начальных кластера для данных.