Как создать СДР таким образом, чтобы его разделы создавались путем многослойной выборки Исходного СДР? - PullRequest
0 голосов
/ 26 октября 2018

У меня есть СДР с множеством записей и функций, кроме оригинальных СДР. Я хотел создать еще один RDD, далее sampledRDD, чтобы каждый раздел sampledRDD был стратифицирован от исходного RDD.

        Original RDD                            Sampled RDD
+----------+---------------+------+   +----------+---------------+------+   
|rowNumber | other features| label|   |rowNumber | other features| label| 
+----------+---------------+------+   +----------+---------------+------+ 
|1         |some values....|A     |   |1         |some values....|A     #
|2         |some values....|A     |   |2         |some values....|A     # 
|3         |some values....|A     |   |3         |some values....|A     #  
|4         |some values....|A     |   |8         |some values....|B     # <= Partition 0
|5         |some values....|A     |   |9         |some values....|B     #   
|6         |some values....|A     |   |13        |some values....|c     #_______________   
|7         |some values....|A     |   |6         |some values....|A     $  
|8         |some values....|B     |   |7         |some values....|A     $  
|9         |some values....|B     |   |4         |some values....|A     $  
|10        |some values....|B     |   |5         |some values....|A     $ <= Partition 1
|11        |some values....|B     |   |10        |some values....|B     $  
|12        |some values....|c     |   |11        |some values....|B     $  
|13        |some values....|c     |   |12        |some values....|c     $  
+----------+---------------+------+   +----------+---------------+------+  

Это изображение показывает схему вопроса.

Мое решение здесь, но оно очень медленное и, кажется, не подходит для большого или большого набора данных.

def StratifiedPartitions(
                            data:RDD[Row],
                            rate:Double,
                            nPartition:Int,
                            schema:baseSchema,
                            withReplacement:Boolean = false,
                            seed:Long = System.currentTimeMillis()):RDD[Row] = {

    class partitioner(override val numPartitions: Int) extends Partitioner {
      def getPartition(key: Any): Int = key.toString.toInt
    }

    Random.setSeed(seed)
    val r = Random
    val fractions = data.map(r => r.getByte(schema.cIndex)).distinct.map(x => (x, rate)).collectAsMap
    val samples = (0 until nPartition).map(idx =>
      data
        .map(r => (r.getByte(schema.cIndex), r))
        .sampleByKey(withReplacement, fractions, r.nextInt())
        .map(r => (idx, r._2))
    ).reduce(_ union _)

    samples.partitionBy(new partitioner(nPartition)).map(_._2)
  }
...