Почему фильтр не сохраняет разбиение? - PullRequest
0 голосов
/ 11 мая 2018

Это цитата из jaceklaskowski.gitbooks.io .

Некоторые операции, например, map, flatMap, filter, не сохраняют разбиение.map, flatMap, операции фильтра применяют функцию к каждому разделу.

Я не понимаю, почему фильтр не сохраняет разбиение.Это просто получение подмножества каждого раздела, которые удовлетворяют условию, поэтому я думаю, что разделы могут быть сохранены.Почему это не так?

Ответы [ 2 ]

0 голосов
/ 11 мая 2018

Фильтр сохраняет разбиение, по крайней мере, это предполагает исходный код фильтра (preservesPartitioning = true):

  /**
   * Return a new RDD containing only the elements that satisfy a predicate.
   */
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }
0 голосов
/ 11 мая 2018

Вы, конечно, правы. Цитата просто неверна. filter сохраняет разбиение (по той причине, которую вы уже описали), и тривиально подтвердить, что

val rdd = sc.range(0, 10).map(x => (x % 3, None)).partitionBy(
  new org.apache.spark.HashPartitioner(11)
)

rdd.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

val filteredRDD = rdd.filter(_._1 == 3)
filteredRDD.partitioner
// Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@b)

rdd.partitioner == filteredRDD.partitioner
// Boolean = true

Это остается в отличие от операций типа map, которые не сохраняют разделение (Partitioner):

rdd.map(identity _).partitioner
// Option[org.apache.spark.Partitioner] = None

Datasets немного более тонкие, поскольку фильтры обычно сдвигаются вниз, но в целом поведение аналогично.

...