Я не уверен, чего вы пытаетесь достичь, и я в некотором роде новичок по сравнению с некоторыми экспертами здесь.
Я представляю кое-что, что может дать вам представление о том, как делать то, что я думаю правильно, и сделать несколько комментариев:
- Вы, кажется, получаете разделы явно и вызываете mapPartitions - 1-йдля меня.
- RDD внутри mapPartitions и различные SPARK SCALA не будут летать;речь идет об итерациях, и я думаю, что вам нужно перейти на уровень SCALA.
- Сериализуемая ошибка возникает из-за выполнения List [Int].
Вот пример, показывающий разделение индекса вдольс этими соответствующими значениями индекса.
import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
// from your stuff, left in
val parallel: RDD[Int] = sc.parallelize(1 to 9, 4)
val mapped = parallel.mapPartitionsWithIndex{
(index, iterator) => {
println("Called in Partition -> " + index)
val myList = iterator.toList
myList.map(x => (index, x)).groupBy( _._1 ).mapValues( _.map( _._2 ) ).toList.iterator
}
}
mapped.collect()
Это возвращает следующее, что немного напоминает то, что, как мне кажется, вы хотели:
res38: Array[(Int, List[Int])] = Array((0,List(1, 2)), (1,List(3, 4)), (2,List(5, 6)), (3,List(7, 8, 9)))
Заключительное примечание: документацию и тому подобное не так легко выполнить, выне получить все это из примера подсчета слов!
Итак, надеюсь, это поможет.
Я думаю, что это может привести вас к правильному пути туда, куда вы хотите пойти, я не совсем мог это увидеть, но, может быть, теперь вы можете видеть лес за деревьями.