Если я правильно понимаю, вы хотите прочитать эти данные в несколько разделов и вызвать некоторую функцию в каждом разделе, а не выполнять итерации по каждому элементу.
Давайте рассмотрим, как вы хотите найти максимум для каждого из заданныхpartition.
Вы можете использовать mapPartitions
или mapPartitionsWithIndex
, оба из которых будут работать для каждого из разделов.
mapPartitions
примет вход, который будет итератором, а mapPartitionsWithIndex
должен принять 2 параметра, а именно index
и iterator
.
Давайте определим функцию, которая получает максимум для данного итератора.
//Function to find the max for an iterator and return back an iterator with only the max element
def findMax(numbers :Iterator[Int]) : Iterator[Int] = {
val max = numbers.max;
Iterator(max)
}
findMax(Iterator(7,8,9,2,3)).next
//9: Int
Давайте создадим rdd с двумя разделами и распечатаем элементы каждого раздела.
val rdd = sc.parallelize(1 to 30, 2)
val mapped = rdd.mapPartitionsWithIndex{ (index, iterator) => {
val myList = iterator.toList
val item = Map(index -> myList)
item.iterator
}
}
mapped.collect().foreach(println)
/*
(0,List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
(1,List(16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30))
*/
Теперь мы видим, что есть 2 раздела - 0
и 1
.
Далее мы найдем максимум для каждого раздела, используя нашу findMax
функцию, определенную выше.
val maxByPartitions = rdd.mapPartitions(findMax).collect()
maxByPartitions: Array[Int] = Array(15, 30)