Я использую Spark в режиме local
из среды IDE / eclipse.
Я вижу, что Spark UI создает множество заданий для простой агрегации. Почему?
import org.apache.spark.sql.SparkSession
trait SparkSessionWrapper {
lazy val spark: SparkSession = {
SparkSession
.builder()
.master("local[2]")
.appName("Spark Me")
.getOrCreate()
}
spark.sparkContext.setLogLevel("WARN")
}
Приложение Spark выглядит следующим образом:
object RowNumberCalc
extends App
with SparkSessionWrapper {
import spark.implicits._
val cityDf = Seq(
("London", "Harish",5500,"2019-10-01"),
("NYC","RAJA",11121,"2019-10-01"),
("SFO","BABU",77000,"2019-10-01"),
("London","Rick",7500,"2019-09-01"),
("NYC","Jenna",6511,"2019-09-01"),
("SFO","Richard",234567,"2019-09-01"),
("London","Harish",999999,"2019-08-01"),
("NYC","Sam",1234,"2019-08-01"),
("SFO","Dylan",45678,"2019-08-01")).toDF("city","name","money","month_id")
cityDf.createOrReplaceTempView("city_table")
val totalMoneySql =
"""
|select city, sum(money) from city_table group by 1 """.stripMargin
spark.sql(totalMoneySql).show(false)
System.in.read
spark.stop()
}
Как показано простое вычисление Сумма денег для каждого города сейчас ИСКРА-UI показывает ==> 5 заданий каждый с 2 этапов !!! На вкладке
И SQL также отображаются 5 заданий.
Но Физический план показывает правильный Этап деление
== Physical Plan ==
CollectLimit 21
+- *(2) LocalLimit 21
+- *(2) HashAggregate(keys=[city#9], functions=[sum(cast(money#11 as bigint))], output=[city#9, sum(money)#24])
+- Exchange hashpartitioning(city#9, 200)
+- *(1) HashAggregate(keys=[city#9], functions=[partial_sum(cast(money#11 as bigint))], output=[city#9, sum#29L])
+- LocalTableScan [city#9, money#11]
ОТ ГДЕ / КАК 5 РАБОТ запускаются ???