В Spark операции, которые не перераспределяют / не переставляют данные, сохраняют разделы (работая на ранее установленных разделах).map
и flatMap
являются такими операциями: они не изменят количество разделов.Кроме того, map
не изменит количество строк внутри разделов или их порядок.
как RDD и DataSets поддерживают там разделы
Вы смешиваете две концепции:(1) разделитель, связанный с данными в точке его преобразования, и (2) разделы, на которые данные разбиваются.
Существует разница между тем, как данные разделены, и тем, какой разделитель связан с данными.,Как объяснено выше, map
и flatMap
не изменяют количество разделов, но не дают никаких гарантий относительно разделителя, связанного с данными.Рассмотрим RDD map
:
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
и MapPartitionsRDD
:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false, ...)
Так что, хотя map
не перераспределяет данные, он не дает никаких гарантий относительно связанного разделителяс данными, потому что нет никаких ограничений на то, как map
может изменять строки.
Пара RDD, т. е. RDD[(K, V)]
, несколько особенная в том смысле, что они часто являются результатом операции разделения и, если мыиспользуйте mapValues
вместо map
, мы можем быть уверены, что разделитель не изменился, потому что мы не касались «ключей».
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
val cleanF = self.context.clean(f)
new MapPartitionsRDD[(K, U), (K, V)](self,
(context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
preservesPartitioning = true)
}
Надеюсь, это поможет!