Исходный код Spark: новый этап в DAGScheduler - PullRequest
0 голосов
/ 23 ноября 2018

Я читаю исходный код DAGScheduler из ветви Spark 0.5.

Метод newStage:

  def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]]): Stage = {
    cacheTracker.registerRDD(rdd.id, rdd.splits.size)
    ...
    val id = nextStageId.getAndIncrement()
    val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd))
    ...
  }

Во-первых, он вызывает registerRDDчтобы зарегистрировать rdd, и фактически сохраняется как Hashset.

Во-вторых, он создает новый Stage.Внутри getParentStages он будет возвращать родительские зависимости rdd, но он также всегда будет регистрировать этот rdd снова в дополнение к своим родителям.

Основная логика(псевдокод) getParentStages - это:

def visit(rdd) {
   if (rdd not visited) {
      set rdd visited;
      registerRDD(rdd);
      for (each dep in rdd.dependencies) {
        visit(dep.rdd)
      }
  }
}
visit(rdd)

На мой взгляд, поскольку этот rdd будет зарегистрирован в getParentStages, необходимо ли его регистрировать в методе newStage?

Конечно, поскольку базовой структурой данных является HashSet, при повторной регистрации ничего не произойдет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...