Нужно ли мне постоянно обновлять СДР? - PullRequest
0 голосов
/ 24 февраля 2019

Я работаю с программой искры, которой необходимо постоянно обновлять некоторые СДР в цикле:

var totalRandomPath: RDD[String] = null
for (iter <- 0 until config.numWalks) {
  var randomPath: RDD[String] = examples.map { case (nodeId, clickNode) =>
    clickNode.path.mkString("\t")
  }

  for (walkCount <- 0 until config.walkLength) {
    randomPath = edge2attr.join(randomPath.mapPartitions { iter =>
      iter.map { pathBuffer =>
        val paths: Array[String] = pathBuffer.split("\t")

        (paths.slice(paths.size - 2, paths.size).mkString(""), pathBuffer)
      }
    }).mapPartitions { iter =>
      iter.map { case (edge, (attr, pathBuffer)) =>
        try {
          if (pathBuffer != null && pathBuffer.nonEmpty && attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty) {
            val nextNodeIndex: PartitionID = GraphOps.drawAlias(attr.J, attr.q)
            val nextNodeId: VertexId = attr.dstNeighbors(nextNodeIndex)
            s"$pathBuffer\t$nextNodeId"
          } else {
            pathBuffer //add
          }
        } catch {
          case e: Exception => throw new RuntimeException(e.getMessage)
        }
      }.filter(_ != null)
    }
  }

  if (totalRandomPath != null) {
    totalRandomPath = totalRandomPath.union(randomPath)
  } else {
    totalRandomPath = randomPath
  }
}

В этой программе СДР totalRandomPath и randomPath постоянно обновляются с большим количествомоперации преобразования: join и mapPartitions.Эта программа закончится действием collect.

Так нужно ли сохранять эти постоянно обновляемые RDD (totalRandomPath, randomPath), чтобы ускорить мою искровую программу?
И я заметил, что эта программа работает быстро на машине с одним узлом, но замедляется при запуске на трехузел кластера, почему это происходит?

1 Ответ

0 голосов
/ 24 февраля 2019

Да, вам необходимо сохранить обновленную СДР, а также отключить устаревшую СДР

var totalRandomPath:RDD[String] = spark.sparkContext.parallelize(List.empty[String]).cache()   
for (iter <- 0 until config.numWalks){

    // existing logic

    val tempRDD = totalRandomPath.union(randomPath).cache()
    tempRDD foreach { _ => } //this will trigger cache operation for tempRDD immediately  
    totalRandomPath.unpersist() //unpersist old RDD which is no longer needed
    totalRandomPath = tempRDD   // point totalRandomPath to updated RDD
}
...