Это только частичное решение, но я думаю, что если ваши разделы сделаны правильно, вы можете использовать mapPartitions
, чтобы выполнить работу для каждого раздела. Примерно так:
val rdd: RDD[(Int, String, String)] = ...
rdd.mapPartitions { it =>
it.foldLeft(List.empty[(Int, String, String)]) {
case (Nil, e) => List(e)
case ((i, ci1, ci2) :: tail, (j, cj1, cj2)) =>
if (ci1 == cj1 || ci2 == cj2)
(i, ci1, ci2) :: tail
else
(j, cj1, cj2) :: (i, ci1, ci2) :: tail
}
}