Я бы порекомендовал использовать метод U,combOp:(U,U)=>U)(implicitevidence$30:scala.reflect.ClassTag[U]):U" rel="nofollow noreferrer"> агрегирования СДР:
val rdd = sc.textFile("/path/to/textfile").
map(_.split(","))
// res1: Array[Array[String]] = Array(
// Array(20, 1, helloworld, alaaa), Array(2, 3, world, neww), Array(1, 223, ala, 12341234)
// )
val seqOp = (m: Array[Int], r: Array[String]) =>
(r zip m).map( t => Seq(t._1.length, t._2).max )
val combOp = (m1: Array[Int], m2: Array[Int]) =>
(m1 zip m2).map( t => Seq(t._1, t._2).max )
val size = rdd.collect.head.size
rdd.
aggregate( Array.fill[Int](size)(0) )( seqOp, combOp ).
zipWithIndex.map(_.swap).
toMap
// res2: scala.collection.immutable.Map[Int,Int] = Map(0 -> 2, 1 -> 3, 2 -> 10, 3 -> 8)
Обратите внимание, что aggregate
принимает:
- массив из 0 (размером равнымк размеру строки rdd) в качестве начального значения
- функция
seqOp
для вычисления максимальной длины строки в пределах раздела и - другая функция
combOp
для объединения результатов по разделам дляокончательные максимальные значения.