Scala Spark RDD, DataSet, PairRDD и Partitoning - PullRequest
0 голосов
/ 23 января 2019

В Scala Spark есть несколько методов, которые могут привести к разделению / перераспределению данных. Они включают partitionBy, coalesce, repartition, and textFile среди других функций, у которых в качестве параметра указано количество разделов. Ниже я использую textFile со спецификацией не менее 8 разделов. Я не хочу, чтобы преобразования отменяли эти разделы. Для сохранения разделов вам нужно persist результат разделения. Однако такие функции, как map и flatMap, не сохраняют разделы. Я считаю, что это может повлиять на производительность. У PairRDDS есть mapValues and flatMapValues, которые поддерживают разделы.

Существуют ли эквивалентные функции для DataSets and RDDs для map and flatMap, которые не портят разделы?

Если я все это перепутал, как RDD и DataSets поддерживают там разделы, учитывая, что операции map и flatMap являются ключевыми в их манипулировании.

val tweets:RDD[Tweet] = mySpark.sparkContext.textFile(path,8).map(parseTweet).persist()
val numerical_fields_Tweets:Dataset[Tweet] = tweets.toDS()

Below is a screenshot from a youtube video stating that a map in a pairRDD results in an RDD without a partitioner

1 Ответ

0 голосов
/ 23 января 2019

В Spark операции, которые не перераспределяют / не переставляют данные, сохраняют разделы (работая на ранее установленных разделах).map и flatMap являются такими операциями: они не изменят количество разделов.Кроме того, map не изменит количество строк внутри разделов или их порядок.

как RDD и DataSets поддерживают там разделы

Вы смешиваете две концепции:(1) разделитель, связанный с данными в точке его преобразования, и (2) разделы, на которые данные разбиваются.

Существует разница между тем, как данные разделены, и тем, какой разделитель связан с данными.,Как объяснено выше, map и flatMap не изменяют количество разделов, но не дают никаких гарантий относительно разделителя, связанного с данными.Рассмотрим RDD map:

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

и MapPartitionsRDD:

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],
    preservesPartitioning: Boolean = false, ...)

Так что, хотя map не перераспределяет данные, он не дает никаких гарантий относительно связанного разделителяс данными, потому что нет никаких ограничений на то, как map может изменять строки.

Пара RDD, т. е. RDD[(K, V)], несколько особенная в том смысле, что они часто являются результатом операции разделения и, если мыиспользуйте mapValues вместо map, мы можем быть уверены, что разделитель не изменился, потому что мы не касались «ключей».

/**
 * Pass each value in the key-value pair RDD through a map function without changing the keys;
 * this also retains the original RDD's partitioning.
 */
def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope {
  val cleanF = self.context.clean(f)
  new MapPartitionsRDD[(K, U), (K, V)](self,
    (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
    preservesPartitioning = true)
}

Надеюсь, это поможет!

...