Я использую Scala 2.12.7 и Spark 2.4.3, у меня есть алгоритм для вычисления kNN с использованием RDD[(List[Double], String)]
def classifyPoint(point: List[Double], data: RDD[(List[Double], String)], k: Int): String =
{
val sortedDistances = data.map{case (a, b) => (b, Util.euclideanDistance(point, a))}.sortBy(_._2, ascending = true)
val topk = sortedDistances.zipWithIndex().filter(_._2 < k)
val result = topk.map(_._1).map(entry => (entry._1, 1)).reduceByKey(_+_).sortBy(_._2, ascending = false).first()._1
//for debugging purposes, remember to remove
println(s"Point classified as ${result}")
result
}
с RDD[List[Double]]
под названием testVector
, I 'Я хотел бы параллельно вычислить процесс классификации, используя эту строку (функция classifyPoint, ранее описанная в классе),:
val classificationKnn = testVector.map(knnSpark.classifyPoint(_, modelKnn, k))
, где modelKnn
- это RDD[(List[Double], String)]
Однакоэтот подход дает мне ошибку «у этого RDD отсутствует SparkContext», который, насколько я понимаю, связан с вложенными операциями RDD.
Есть ли способ избежать этой проблемы и при этом иметь возможность параллельных вычислений?
Если я преобразую testVector
в список с testVector.collect().toList
, у меня больше не будет той же проблемы, но это также означает отказ от возможности классификации каждой точки параллельно.