Spark применяет преобразования лениво , т.е. только когда вы вызываете действие типа collect
или take
et c. Таким образом, ваш звонок на sortByKey()
применяется только после того, как вы позвоните на collect
.
. Я создал приложение на основе ваших образцов данных. Я напечатал зависимость RDD, используя toDebugString
, чтобы вы могли получить представление о том, что происходит за кулисами.
Приложение
import org.apache.spark.sql.SparkSession
object PlaygroundApp extends App {
val spark = SparkSession
.builder()
.appName("Stackoverflow App")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
val lines = sc.parallelize(Seq(
(0, 5524.9497),
(51, 4975.2197),
(1, 4958.5996),
(52, 5245.0605),
(2, 5994.591),
(53, 4945.3),
(9, 5322.6494),
(10, 4819.6997))
)
val collectThemAll = lines.reduceByKey((x, y) => x + y)
println("---Before sort")
collectThemAll.foreach(println)
println(collectThemAll.toDebugString)
println()
println("---After sort")
val sorted = collectThemAll.sortByKey()
sorted.collect().foreach(println)
println(sorted.toDebugString)
}
Вывод
---Before sort
(2,5994.591)
(53,4945.3)
(0,5524.9497)
(52,5245.0605)
(10,4819.6997)
(9,5322.6494)
(1,4958.5996)
(51,4975.2197)
(12) ShuffledRDD[1] at reduceByKey at PlaygroundApp.scala:28 []
+-(12) ParallelCollectionRDD[0] at parallelize at PlaygroundApp.scala:17 []
---After sort
(0,5524.9497)
(1,4958.5996)
(2,5994.591)
(9,5322.6494)
(10,4819.6997)
(51,4975.2197)
(52,5245.0605)
(53,4945.3)
(8) ShuffledRDD[4] at sortByKey at PlaygroundApp.scala:37 []
+-(12) ShuffledRDD[1] at reduceByKey at PlaygroundApp.scala:28 []
+-(12) ParallelCollectionRDD[0] at parallelize at PlaygroundApp.scala:17 []