Я читаю исходный код DAGScheduler из ветви Spark 0.5.
Метод newStage
:
def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]]): Stage = {
cacheTracker.registerRDD(rdd.id, rdd.splits.size)
...
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd))
...
}
Во-первых, он вызывает registerRDD
чтобы зарегистрировать rdd
, и фактически сохраняется как Hashset
.
Во-вторых, он создает новый Stage
.Внутри getParentStages
он будет возвращать родительские зависимости rdd
, но он также всегда будет регистрировать этот rdd
снова в дополнение к своим родителям.
Основная логика(псевдокод) getParentStages
- это:
def visit(rdd) {
if (rdd not visited) {
set rdd visited;
registerRDD(rdd);
for (each dep in rdd.dependencies) {
visit(dep.rdd)
}
}
}
visit(rdd)
На мой взгляд, поскольку этот rdd
будет зарегистрирован в getParentStages
, необходимо ли его регистрировать в методе newStage
?
Конечно, поскольку базовой структурой данных является HashSet
, при повторной регистрации ничего не произойдет.