Есть ли эффективный способ разделить RDD с большим списком на несколько списков без выполнения сбора - PullRequest
1 голос
/ 14 мая 2019

У меня есть этот rdd, содержащий кортежи, и сбор их даст мне список.

[x1, x2, x3, x4, x5]

Но я хочу несколько кусков этого списка, например [ [x1,x2,x3], [x4,x5] ] Я могу сделать это, выполнив сначала сборЗатем следует разделить полученный список на куски.

Но я хочу это без выполнения сбора, поскольку сбор может вызвать ошибку пространства кучи и перенести все данные в драйвер, что неэффективно.

1 Ответ

0 голосов
/ 14 мая 2019

Вопрос: есть ли эффективный способ разбить RDD на большой список на несколько списков без выполнения сбора

Вместо того, чтобы собирать и модифицировать большой список в несколько списков, вы можете создать большой rdd для нескольких маленьких RDD для дальнейшей обработки ...

собирать большие СДР не очень хорошая идея. но если вы хотите разделить большой rdd на маленький, т. е. массив [RDD], который вы можете использовать с нижеследующим подходом, записанным в scala, вы можете перевести на python, увидев пример здесь .

Python Docs здесь

вы можете пойти на рандомсплитс смотри документы здесь

вы можете увидеть, как это было реализовано из кода, который доступен в git:

/**
   * Randomly splits this RDD with the provided weights.
   *
   * @param weights weights for splits, will be normalized if they don't sum to 1
   * @param seed random seed
   *
   * @return split RDDs in an array
   */
  def randomSplit(
      weights: Array[Double],
      seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
    require(weights.forall(_ >= 0),
      s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
    require(weights.sum > 0,
      s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")

    withScope {
      val sum = weights.sum
      val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
      normalizedCumWeights.sliding(2).map { x =>
        randomSampleWithRange(x(0), x(1), seed)
      }.toArray
    }
  }

Пример Scala (неудобно с python :-)): для python см. Документы здесь


import org.apache.log4j.Level
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
  * Created by Ram Ghadiyaram
  */
object RDDRandomSplitExample {
  org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName("RDDRandomSplitExample")
      .getOrCreate()


    val y = spark.sparkContext.parallelize(1 to 100)
    // break/split big rdd in to small rdd

    val splits: Array[RDD[Int]] = y.randomSplit(Array(0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1))

    splits.foreach(x => println("number of records in each rdd " + x.count))
  }
}

Результат :

number of records in each rdd 9
number of records in each rdd 9
number of records in each rdd 8
number of records in each rdd 7
number of records in each rdd 9
number of records in each rdd 17
number of records in each rdd 11
number of records in each rdd 9
number of records in each rdd 7
number of records in each rdd 6
number of records in each rdd 8

Вывод: Вы можете видеть почти одинаковое количество элементов в каждом СДР. Вы можете обработать каждый rdd без сбора оригинального большого rdd

...