Spark: вызов функции внутри mapPartitionsWithIndex - PullRequest
0 голосов
/ 07 июня 2018

Я получил очень странные результаты со следующим кодом.Я только хочу взять данные раздела и выполнить итерацию для каждого из данных, X раз.

Здесь я вызываю свою функцию для каждого раздела:

val myRDDResult = myRDD.mapPartitionsWithIndex( myFunction(_, _, limit), preservesPartitioning = true)

И функция такова:

private def myFunction (partitionIndex: Long,
                          partitionData: Iterator[Array[(LabeledPoint,Int,Int)]]), limit: Int): Iterator[String] = {

    var newData = ArrayBuffer[String]()
    if (partitionData.nonEmpty){
        val partDataMap = partitionData.next.map{ case (lp, _, neighId) => (lp, neighId) }.toMap
        var newString:String = ""
        for {
            (k1,_) <- partDataMap
            i  <- 0 to limit
            _ = {
             // ... some code to generate the content for `newString`
            newData.+=(newString)
           }
        }yield ()
    } 
    newData.iterator
}

Вот некоторые полученные значения:

partitionData   limit   newData   newData_expected
1640            250     411138   410000 (1640*250)
16256           27      288820   438912

Я не знаю, неправильно ли я понял концепцию моего кода.

Я такжепопытался изменить часть for для этой идеи: partDataMap.map{elem=> for (i <- 0 to limit){....}}

Есть предложения?

1 Ответ

0 голосов
/ 07 июня 2018

Во-первых, извините, потому что я отклонил ваш вопрос / проголосовал (ошибка клика), и поскольку я не отменил его в течение 10 минут, ТАК поддержал его голосование.

Что касается вашего кода, я думаю, что ваши ожидаемые результатыплохо, потому что я взял тот же код, что и вы, немного упростил его, и вместо того, чтобы получать 410000 элементов, я получил 411640 .Может быть, я что-то скопировал неправильно или проигнорировал некоторые вещи, но код, дающий 411640, выглядит следующим образом:

val limit = 250
val partitionData: Iterator[Array[Int]] = Seq((1 to 1640).toArray).toIterator
var newData = ArrayBuffer[String]()
if (partitionData.nonEmpty){
  val partDataMap = partitionData.next.map{ nr => nr.toString }

  for {
    value <- partDataMap
    i  <- 0 to limit
    _ = {
      newData.+=(s"${value}_${i}")
    }
  } yield ()
}
println(s"new buffer=${newData}")
println(s"Buffer size =  ${newData.size}")

Теперь, чтобы ответить на ваш вопрос о том, почему результаты mapWithPartitions отличаются от ваших ожиданий.ИМО это потому что твое преобразование из Array в Map.Если в вашем массиве у вас есть дублированный ключ, он будет учитываться только один раз.Это может объяснить, почему в обоих случаях (если мы считаем 411640 правильным ожидаемым числом), вы получите результаты ниже ожидаемых.Чтобы быть уверенным в этом, вы можете сравнить partDataMap.size с partitionData.next.size.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...