У меня есть СДР с множеством записей и функций, кроме оригинальных СДР. Я хотел создать еще один RDD, далее sampledRDD, чтобы каждый раздел sampledRDD был стратифицирован от исходного RDD.
Original RDD Sampled RDD
+----------+---------------+------+ +----------+---------------+------+
|rowNumber | other features| label| |rowNumber | other features| label|
+----------+---------------+------+ +----------+---------------+------+
|1 |some values....|A | |1 |some values....|A #
|2 |some values....|A | |2 |some values....|A #
|3 |some values....|A | |3 |some values....|A #
|4 |some values....|A | |8 |some values....|B # <= Partition 0
|5 |some values....|A | |9 |some values....|B #
|6 |some values....|A | |13 |some values....|c #_______________
|7 |some values....|A | |6 |some values....|A $
|8 |some values....|B | |7 |some values....|A $
|9 |some values....|B | |4 |some values....|A $
|10 |some values....|B | |5 |some values....|A $ <= Partition 1
|11 |some values....|B | |10 |some values....|B $
|12 |some values....|c | |11 |some values....|B $
|13 |some values....|c | |12 |some values....|c $
+----------+---------------+------+ +----------+---------------+------+
Это изображение показывает схему вопроса.
Мое решение здесь, но оно очень медленное и, кажется, не подходит для большого или большого набора данных.
def StratifiedPartitions(
data:RDD[Row],
rate:Double,
nPartition:Int,
schema:baseSchema,
withReplacement:Boolean = false,
seed:Long = System.currentTimeMillis()):RDD[Row] = {
class partitioner(override val numPartitions: Int) extends Partitioner {
def getPartition(key: Any): Int = key.toString.toInt
}
Random.setSeed(seed)
val r = Random
val fractions = data.map(r => r.getByte(schema.cIndex)).distinct.map(x => (x, rate)).collectAsMap
val samples = (0 until nPartition).map(idx =>
data
.map(r => (r.getByte(schema.cIndex), r))
.sampleByKey(withReplacement, fractions, r.nextInt())
.map(r => (idx, r._2))
).reduce(_ union _)
samples.partitionBy(new partitioner(nPartition)).map(_._2)
}