val vectors = [SOME RDDs]
val clusterised = vectors.groupBy(...)
// Partitioning
val tunedPartitioner = new RangePartitioner(12, clusterised)
val partitionedClusterised = clusterised.partitionBy(tunedPartitioner).persist()
// Calculate the average positions for each cluster
val averagePos = partitionedClusterised.mapValues(averageVectors)
// averagePos: RDD[(index:Int, v:Vector)]
// Update the means
val tempSorted = averagePos.sortByKey().partitionBy(tunedPartitioner).persist()
Выше приведен код, который я использовал для создания RDD (индекс, вектор), который отсортирован по индексу. Я пытался избежать тасования, вызванного sortByKey, но в отладчике «временная сортировка» RDD все еще является ShuffledRDD.
Я понимаю, что было бы более эффективно не разбивать «кластеризованные» сначала, так как преобразование mapValues уменьшало размер значения в PairRDD (кластеризованный - это индекс, итератор (векторы), тогда как AveragePos - это индекс, вектор), но все же я не мог понять, почему tempSorted является ShuffledRDD. Разве каждый раздел не должен сортировать то, что там есть? Что делает tempSorted ShuffledRDD?