Я работаю с программой искры, которой необходимо постоянно обновлять некоторые СДР в цикле:
var totalRandomPath: RDD[String] = null
for (iter <- 0 until config.numWalks) {
var randomPath: RDD[String] = examples.map { case (nodeId, clickNode) =>
clickNode.path.mkString("\t")
}
for (walkCount <- 0 until config.walkLength) {
randomPath = edge2attr.join(randomPath.mapPartitions { iter =>
iter.map { pathBuffer =>
val paths: Array[String] = pathBuffer.split("\t")
(paths.slice(paths.size - 2, paths.size).mkString(""), pathBuffer)
}
}).mapPartitions { iter =>
iter.map { case (edge, (attr, pathBuffer)) =>
try {
if (pathBuffer != null && pathBuffer.nonEmpty && attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty) {
val nextNodeIndex: PartitionID = GraphOps.drawAlias(attr.J, attr.q)
val nextNodeId: VertexId = attr.dstNeighbors(nextNodeIndex)
s"$pathBuffer\t$nextNodeId"
} else {
pathBuffer //add
}
} catch {
case e: Exception => throw new RuntimeException(e.getMessage)
}
}.filter(_ != null)
}
}
if (totalRandomPath != null) {
totalRandomPath = totalRandomPath.union(randomPath)
} else {
totalRandomPath = randomPath
}
}
В этой программе СДР totalRandomPath
и randomPath
постоянно обновляются с большим количествомоперации преобразования: join
и mapPartitions
.Эта программа закончится действием collect
.
Так нужно ли сохранять эти постоянно обновляемые RDD (totalRandomPath, randomPath), чтобы ускорить мою искровую программу?
И я заметил, что эта программа работает быстро на машине с одним узлом, но замедляется при запуске на трехузел кластера, почему это происходит?