Есть ли способ разделить СДР по строкам? - PullRequest
0 голосов
/ 27 июня 2019

У меня есть куча данных с 20000 строками в JavaRDD. Теперь я хочу сохранить несколько файлов с одинаковым размером (например, 70 строк на файл).

Я попробовал это с приведенным ниже кодом, но из-за того, что он не делится точно, некоторые наборы данных состоят из 69, 70 или 71 строк. Борьба в том, что мне нужны все с одинаковым размером, кроме последней записи (может быть меньше).

Помощь приветствуется !!! Заранее спасибо, ребята!

myString.repartition (286) .saveAsTextFile (OutputPath);

Ответы [ 2 ]

1 голос
/ 27 июня 2019

Вы можете использовать filterByRange сделать что-то вроде (псевдокод):

for i = 0; i < javaRDD.size ; i+= 70
    val tempRDD = javaRDD.filterByRange(i,i+70).repartition(1)
    tempRDD.saveAsTextFile(outputPath + i.toString());
0 голосов
/ 27 июня 2019

К сожалению, ответ Scala, но он работает.

Сначала определите пользовательский разделитель:

class IndexPartitioner[V](n_per_part: Int, rdd: org.apache.spark.rdd.RDD[_ <: Product2[Long, V]], do_cache: Boolean = true) extends org.apache.spark.Partitioner {

    val max = {
        if (do_cache) rdd.cache()
        rdd.map(_._1).max
    }

    override def numPartitions: Int = math.ceil(max.toDouble/n_per_part).toInt
    override def getPartition(key: Any): Int = key match {
        case k:Long => (k/n_per_part).toInt
        case _ => (key.hashCode/n_per_part).toInt
    }
}

Создайте СДР из случайных строк и проиндексируйте ее:

val rdd = sc.parallelize(Array.tabulate(1000)(_ => scala.util.Random.alphanumeric.filter(_.isLetter).take(5).mkString))  
val rdd_idx = rdd.zipWithIndex.map(_.swap)

Создайте разделитель и примените его:

val partitioner = new IndexPartitioner(70, rdd_idx)
val rdd_part = rdd_idx.partitionBy(partitioner).values

Проверьте размеры разделов:

rdd_part
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show

/**
+----------------+-----------------+
|               0|               70|
|               1|               70|
|               2|               70|
|               3|               70|
|               4|               70|
|               5|               70|
|               6|               70|
|               7|               70|
|               8|               70|
|               9|               70|
|              10|               70|
|              11|               70|
|              12|               70|
|              13|               70|
|              14|               20|
+----------------+-----------------+
*/

Один файл для каждого раздела:

import sqlContext.implicits._
rdd_part.toDF.write.format("com.databricks.spark.csv").save("/tmp/idx_part_test/")

(+1 для "_SUCCESS")

XXX$ ls /tmp/idx_part_test/ | wc -l
16
...