Можно также сделать лениво без предварительного расчета размеров.Фильтрация по разделам как минимум с двумя элементами в этом примере
import org.apache.spark.Partitioner
object DemoPartitioner extends Partitioner {
override def numPartitions: Int = 3
override def getPartition(key: Any): Int = key match {
case num: Int => num
}
}
sc.parallelize(Seq((0, "a"), (0, "a"), (0, "a"), (1, "b"), (2, "c"), (2, "c")))
.partitionBy(DemoPartitioner) // create 3 partitions of sizes 3,1,2
.mapPartitions { it =>
val firstElements = it.take(2).toSeq
if (firstElements.size < 2) {
Iterator.empty
} else {
firstElements.iterator ++ it
}
}.foreach(println)
Вывод:
(2,c)
(2,c)
(0,a)
(0,a)
(0,a)
Поэтому раздел 1 только с одним элементом был пропущен