Обработка данных MapReduce Spark с помощью rdd (scala) - PullRequest
0 голосов
/ 22 марта 2019

У меня есть большие данные, и я хочу использовать mapRuduce на этих данных, и я ничего не нашел для этой задачи.(Язык: Scala)

Данные для этого процесса:

Y,20,01
G,18,40
J,19,10
D,50,10
R,20,01
Z,18,40
T,19,10
Q,50,10
... (2.000+)

Для всех этих данных, которые я хочу загрузить на карты: (например)

Y,20,01
G,18,40
J,19,10     MAP 1
D,50,10
---------------------
R,20,01
Z,18,40     MAP 2
T,19,10
Q,50,10
... (2.000+)

На всех картах локально я хочу найти минимальные столбцы.

После того, как все карты отправлены на уменьшение локальных данных, а уменьшение находит глобальные минимальные столбцы.

Вы можете мне помочь?Как я это сделаю?

1 Ответ

0 голосов
/ 22 марта 2019

Если я правильно понимаю, вы хотите прочитать эти данные в несколько разделов и вызвать некоторую функцию в каждом разделе, а не выполнять итерации по каждому элементу.

Давайте рассмотрим, как вы хотите найти максимум для каждого из заданныхpartition.

Вы можете использовать mapPartitions или mapPartitionsWithIndex, оба из которых будут работать для каждого из разделов.

mapPartitions примет вход, который будет итератором, а mapPartitionsWithIndex должен принять 2 параметра, а именно index и iterator.

Давайте определим функцию, которая получает максимум для данного итератора.

//Function to find the max for an iterator and return back an iterator with only the max element
def findMax(numbers :Iterator[Int]) : Iterator[Int] = {
  val max = numbers.max;
  Iterator(max)
}
findMax(Iterator(7,8,9,2,3)).next
//9: Int

Давайте создадим rdd с двумя разделами и распечатаем элементы каждого раздела.

val rdd = sc.parallelize(1 to 30, 2)

val mapped = rdd.mapPartitionsWithIndex{ (index, iterator) => { 
  val myList = iterator.toList
  val item = Map(index -> myList)
  item.iterator
  }
}

mapped.collect().foreach(println)
/*
(0,List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15))
(1,List(16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30))
*/

Теперь мы видим, что есть 2 раздела - 0 и 1.

Далее мы найдем максимум для каждого раздела, используя нашу findMax функцию, определенную выше.

val maxByPartitions = rdd.mapPartitions(findMax).collect()
maxByPartitions: Array[Int] = Array(15, 30)
...