К сожалению, ответ Scala, но он работает.
Сначала определите пользовательский разделитель:
class IndexPartitioner[V](n_per_part: Int, rdd: org.apache.spark.rdd.RDD[_ <: Product2[Long, V]], do_cache: Boolean = true) extends org.apache.spark.Partitioner {
val max = {
if (do_cache) rdd.cache()
rdd.map(_._1).max
}
override def numPartitions: Int = math.ceil(max.toDouble/n_per_part).toInt
override def getPartition(key: Any): Int = key match {
case k:Long => (k/n_per_part).toInt
case _ => (key.hashCode/n_per_part).toInt
}
}
Создайте СДР из случайных строк и проиндексируйте ее:
val rdd = sc.parallelize(Array.tabulate(1000)(_ => scala.util.Random.alphanumeric.filter(_.isLetter).take(5).mkString))
val rdd_idx = rdd.zipWithIndex.map(_.swap)
Создайте разделитель и примените его:
val partitioner = new IndexPartitioner(70, rdd_idx)
val rdd_part = rdd_idx.partitionBy(partitioner).values
Проверьте размеры разделов:
rdd_part
.mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
.toDF("partition_number","number_of_records")
.show
/**
+----------------+-----------------+
| 0| 70|
| 1| 70|
| 2| 70|
| 3| 70|
| 4| 70|
| 5| 70|
| 6| 70|
| 7| 70|
| 8| 70|
| 9| 70|
| 10| 70|
| 11| 70|
| 12| 70|
| 13| 70|
| 14| 20|
+----------------+-----------------+
*/
Один файл для каждого раздела:
import sqlContext.implicits._
rdd_part.toDF.write.format("com.databricks.spark.csv").save("/tmp/idx_part_test/")
(+1 для "_SUCCESS")
XXX$ ls /tmp/idx_part_test/ | wc -l
16