Я пытаюсь понять преимущества известного секционера в Apache Spark (v2.2.3)
, для которого я написал наивную реализацию алгоритма Page-Rank, следуя примеру кода ссылки - https://github.com/apache/spark/blob/master/examples/src/main/python/pagerank.py
linksRDD
относится к типу RDD[(String, Iterable[String])]
ranksRDD
относится к типу RDD[(String, Double)]
- Я использовал известный
HashPartitioner
, как указано в коде ниже.
val hashParts = new HashPartitioner(partitions = 10)
val partLinks = linksRDD.partitionBy(hashParts)
var rankUpdates = ranksRDD
for (_ <- 0 until numOfIters) {
val contributions = links
.join(rankUpdates)
.flatMap({
case (_, (outLinks, rank)) =>
val numOfOutLinks = outLinks.size
outLinks.map(x => (x, rank / numOfOutLinks))
})
rankUpdates = contributions
.reduceByKey(_ + _)
.mapValues(_ * 0.85 + 0.15)
}
rankUpdates.saveAsObjectFile("SOME_PATH") //ACTION TO TRIGGER THE COMPUTATION.
- Для сравнения я выполнил еще раз без разделителя. (как указано выше, две строки)
// NO PARTITIONER OBJECT IS PROVIDED/MENTIONED
val partLinks = linksRDD
val partRanks = ranksRDD
Теперь для кода, использующего ноу-хау HashPartitioner
, существует много этапов, созданных только с одной задачей partitionBy
, у которой нет родителя. Этап также не имеют дочерних этапов. Эти этапы не связаны с какими-либо действиями, они просто находятся между DAG.
Может кто-нибудь объяснить мне, почему эти стадии созданы? Поскольку суммирование всех случайных чисел для этапов с известным разделителем и без известного разделителя получается примерно одинаковым. Из-за этих дополнительных этапов не наблюдается никакого увеличения объема перетасовываемых данных.