Вы можете реализовать RDD, который выполняет генерацию случайных данных параллельно, как в следующем примере.
import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD
// Each random partition will hold `numValues` items
final class RandomPartition[A: ClassTag](val index: Int, numValues: Int, random: => A) extends Partition {
def values: Iterator[A] = Iterator.fill(numValues)(random)
}
// The RDD will parallelize the workload across `numSlices`
final class RandomRDD[A: ClassTag](@transient private val sc: SparkContext, numSlices: Int, numValues: Int, random: => A) extends RDD[A](sc, deps = Seq.empty) {
// Based on the item and executor count, determine how many values are
// computed in each executor. Distribute the rest evenly (if any).
private val valuesPerSlice = numValues / numSlices
private val slicesWithExtraItem = numValues % numSlices
// Just ask the partition for the data
override def compute(split: Partition, context: TaskContext): Iterator[A] =
split.asInstanceOf[RandomPartition[A]].values
// Generate the partitions so that the load is as evenly spread as possible
// e.g. 10 partition and 22 items -> 2 slices with 3 items and 8 slices with 2
override protected def getPartitions: Array[Partition] =
((0 until slicesWithExtraItem).view.map(new RandomPartition[A](_, valuesPerSlice + 1, random)) ++
(slicesWithExtraItem until numSlices).view.map(new RandomPartition[A](_, valuesPerSlice, random))).toArray
}
Получив это, вы можете использовать его, передав собственный генератор случайных данных, чтобы получить RDD[Int]
val rdd = new RandomRDD(spark.sparkContext, 10, 22, scala.util.Random.nextInt(100) + 1)
rdd.foreach(println)
/*
* outputs:
* 30
* 86
* 75
* 20
* ...
*/
или RDD[(Int, Int, Int)]
def rand = scala.util.Random.nextInt(100) + 1
val rdd = new RandomRDD(spark.sparkContext, 10, 22, (rand, rand, rand))
rdd.foreach(println)
/*
* outputs:
* (33,22,15)
* (65,24,64)
* (41,81,44)
* (58,7,18)
* ...
*/
и, конечно, вы можете очень легко обернуть его в DataFrame
:
spark.createDataFrame(rdd).show()
/*
* outputs:
* +---+---+---+
* | _1| _2| _3|
* +---+---+---+
* |100| 48| 92|
* | 34| 40| 30|
* | 98| 63| 61|
* | 95| 17| 63|
* | 68| 31| 34|
* .............
*/
Обратите внимание, как в этом случае сгенерированные данные отличаются каждый раз при воздействии RDD
/ DataFrame
.Изменяя реализацию RandomPartition
для фактического хранения значений вместо генерации их на лету, вы можете иметь стабильный набор случайных элементов, сохраняя при этом гибкость и масштабируемость этого подхода.
Один приятныйСвойство подхода без сохранения состояния заключается в том, что вы можете генерировать огромный набор данных даже локально.На моем ноутбуке через несколько секунд запустилось следующее:
new RandomRDD(spark.sparkContext, 10, Int.MaxValue, 42).count
// returns: 2147483647