У меня есть rdd. Я хочу сгруппировать его по некоторому свойству и сохранить каждую группу в отдельный файл (и получить список имен файлов результатов).
Самый наивный способ:
val rdd : RDD[Long] = ???
val byLastDigit: RDD[(Int, Long)] = rdd.map(n => ((n % 10).toInt, n))
val saved: Array[String] = byLastDigit.groupByKey().map((numbers: (Int, Iterable[Long])) => {
//save numbers into a file
???
}).collect()
Недостатком этого подхода является то, что он хранит в памяти все значения для ключа одновременно. Так что он будет плохо работать на огромных наборах данных.
Альтернативный подход:
byLastDigit.partitionBy(new HashPartitioner(1000)).mapPartitions((numbers: Iterator[(Int, Long)]) => {
//assume that all numbers in a partition have the same key
???
}).collect()
Так как количество разделов намного больше, чем количество ключей, каждый раздел, скорее всего, будет содержать числа только для одного ключа.
Плавно работает для огромных наборов данных. Но это уродливо и гораздо более подвержено ошибкам.
Можно ли сделать это лучше?