Я хотел бы разделить СДР на количество разделов, соответствующее количеству различных ключей, которые я нашел (в данном случае 3):
СДР: [(1,a), (1,b), (1,c), (2,d), (3,e), (3,f), (3,g), (3,h), (3,i)]
Теперь я делаю, что элементы с одинаковым ключом попадают в один и тот же раздел:
[(1,a), (1,b), (1,c)]
[(2,d)]
[(3,e), (3,f), (3,g), (3,h), (3,i)]
Вот как я делю
val partitionedRDD = rdd.partitionBy(new PointPartitioner(
rdd.keys.distinct().count().asInstanceOf[Int]))
Это класс PoinPartitioner
class PointPartitioner(numParts: Int) extends org.apache.spark.Partitioner{
import org.apache.spark.Partitioner
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
key.hashCode % numPartitions
}
override def equals(other: Any): Boolean = other match
{
case dnp: PointPartitioner =>
dnp.numPartitions == numPartitions
case _ =>
false
}
}
Однако элементы разбалансированы по разделам. Я хотел бы получить RDD-раздел, подобный этому, где все разделы содержат примерно одинаковое количество элементов, соблюдая порядок ключей:
[(1,a), (1,b), (1,c)]
[(2,d), (3,e), (3,f)]
[(3,g), (3,h), (3,i)]
Что я мог попробовать?