Как правильно использовать функцию mapPartitions - PullRequest
0 голосов
/ 04 декабря 2018

Я делаю программу с большими данными, и поэтому я использую Spark и Scala.Мне нужно разбить базу данных, и для этого я использую

var data0 = conf.dataBase.repartition (8) .persist (StorageLevel.MEMORY_AND_DISK_SER)

, но затем мне нужно сделать что-то в разделе, прежде чем продолжить работу с частью базы данных, соответствующей этому разделу, и для этого я использую

var tester = data0.mapPartitions {x =>
   configFuzzyPredProblem ()
   Strategy.getStrategy.executeStrategy (conf.iterByRun, 5, GeneratorType.HillClimbing)
 } .persist (StorageLevel.MEMORY_AND_DISK_SER)

В методе executeStrategy() Я использую базу данных, но я не знаю, является ли она глобальной или соответствующей этой секции.Как я могу узнать, какой из них я использую, и затем выполнять обработку раздела только с базой данных этого раздела?

1 Ответ

0 голосов
/ 04 декабря 2018

Вот простой пример использования mapPartitionsWithIndex, который следует тем же правилам mapPartitions - исключая аспект индекса.

Вы можете видеть, что внутри mapPartitions вам нужно обработать целое число, Interator Int в этом примере.В этом случае 3 раздела обрабатываются, в вашем случае 8, либо с некоторыми записями, либо, возможно, с нулевыми записями.

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
    iter.map(x => index + "," + x)
}
val rdd2 = rdd1.mapPartitionsWithIndex(myfunc)

Я не могу видеть в вашей функции, но я предполагаю, что это нормально, и он будет обрабатывать раздел - часть вашей базы данных.

...