rdd.map вызывает функцию дважды, а не один раз - PullRequest
0 голосов
/ 25 июня 2019

Я новичок в программировании Scala, и в настоящее время я работаю с RDD.Я пытаюсь передать RDD в функцию и хотел бы, чтобы функция возвращала его, чтобы я мог сохранить его в новом RDD.Для этой цели я использую карту.Но map вызывает функцию дважды, тогда как внутри RDD есть только одна запись.Он отлично работает, когда я использовал collect.foreach () вместо map, но я не могу сохранить значения обновлений в новом RDD, так как он возвращает значение в Unit.

Этот код возвращает значение из функции обновления, но вызывает функцию дважды:

temp_rdd = my_rdd.map{x => update(x)}

В то время как этот код вызывает его один раз совершенно, но я не могу изменить значения СДР:

my_rdd.collect().foreach{x => update(x)}

Функция foreach возвращает формат в «Единице», поскольку я не могу сохранить его в новом СДР.Я ищу способ сохранить обновленные значения в новом СДР.

1 Ответ

2 голосов
/ 25 июня 2019

С https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

map - это преобразование , которое передает каждый элемент набора данных через функцию и возвращает новый RDD, представляющий результаты. Все преобразования в Spark являются ленивыми и вычисляются, когда действие требует, чтобы результат был возвращен в программу драйвера. По умолчанию каждый преобразованный RDD может пересчитываться каждый раз, когда вы выполняете над ним действие (или вы можете сохранить RDD в памяти, используя .cache()).

С другой стороны, действия (например, collect или reduce) возвращают значение (не СДР) программе драйвера после выполнения вычисления на СДР.

Ниже приведен пример кэширования СДР для предотвращения его многократного вычисления

val array = Array("1", "2", "3")
val rdd = sc.parallelize(array)
var i = 0
val mapRdd = rdd.map(s"$i: " + _)
mapRdd.take(3).foreach(println) // mapRdd is computed here...
// Output
// 0: 1
// 0: 2
// 0: 3

i = i + 1
mapRdd.take(3).foreach(println) // ... and here
// Output
// 1: 1
// 1: 2
// 1: 3

val cachedMapRdd = rdd.map(s"$i: " + _).cache()
cachedMapRdd.take(3).foreach(println) // cachedMapRdd is computed here...
// Output
// 1: 1
// 1: 2
// 1: 3

i = i + 1
cachedMapRdd.take(3).foreach(println) // ... but not here
// Output
// 1: 1
// 1: 2
// 1: 3
...