Почему Spark запускает 5 заданий для простой агрегации? - PullRequest
4 голосов
/ 25 октября 2019

Я использую Spark в режиме local из среды IDE / eclipse.

Я вижу, что Spark UI создает множество заданий для простой агрегации. Почему?

import org.apache.spark.sql.SparkSession

trait SparkSessionWrapper {

  lazy val spark: SparkSession = {
    SparkSession
      .builder()
      .master("local[2]")
      .appName("Spark Me")
      .getOrCreate()
  }

  spark.sparkContext.setLogLevel("WARN")

} 

Приложение Spark выглядит следующим образом:

object RowNumberCalc
  extends App
  with SparkSessionWrapper {

  import spark.implicits._

  val cityDf = Seq(
    ("London", "Harish",5500,"2019-10-01"),
    ("NYC","RAJA",11121,"2019-10-01"),
    ("SFO","BABU",77000,"2019-10-01"),
    ("London","Rick",7500,"2019-09-01"),
    ("NYC","Jenna",6511,"2019-09-01"),
    ("SFO","Richard",234567,"2019-09-01"),
    ("London","Harish",999999,"2019-08-01"),
    ("NYC","Sam",1234,"2019-08-01"),
    ("SFO","Dylan",45678,"2019-08-01")).toDF("city","name","money","month_id")

  cityDf.createOrReplaceTempView("city_table")
  val totalMoneySql =
    """
      |select city, sum(money) from city_table group by 1 """.stripMargin
  spark.sql(totalMoneySql).show(false)


  System.in.read
  spark.stop()

}

Как показано простое вычисление Сумма денег для каждого города сейчас ИСКРА-UI показывает ==> 5 заданий каждый с 2 этапов !!! На вкладке enter image description here

И SQL также отображаются 5 заданий.

enter image description here

Но Физический план показывает правильный Этап деление

== Physical Plan ==
CollectLimit 21
+- *(2) LocalLimit 21
   +- *(2) HashAggregate(keys=[city#9], functions=[sum(cast(money#11 as bigint))], output=[city#9, sum(money)#24])
      +- Exchange hashpartitioning(city#9, 200)
         +- *(1) HashAggregate(keys=[city#9], functions=[partial_sum(cast(money#11 as bigint))], output=[city#9, sum#29L])
            +- LocalTableScan [city#9, money#11]

ОТ ГДЕ / КАК 5 РАБОТ запускаются ???

1 Ответ

4 голосов
/ 27 октября 2019

tl; dr У вас очень мало строк для работы (9 в качестве основного входа и 3 агрегата) в 200 разделах по умолчанию и, таким образом, 5 заданий Spark для удовлетворения требований Dataset.show чтобы показать 20 строк.


Другими словами, то, что вы испытываете, * Dataset.show -конкретно (что, кстати, не для больших наборов данных, не так ли?)

По умолчанию Dataset.show отображает 20 строк. Начинается с 1 раздела и занимает до 20 строк. Если строк недостаточно, он умножается на 4 (если я не ошибаюсь) и сканирует остальные 4 раздела, чтобы найти пропущенные строки. Это работает до тех пор, пока не будет собрано 20 строк.

Количество выходных строк последних HashAggregate равно 3 строкам.

В зависимости от того, какие разделы находятся в Spark, эти 3 строки могут выполнять одну, две илибольше рабочих мест. Это сильно зависит от хеша строк (для HashPartitioner).


Если вы действительно хотите увидеть одно задание Spark для этого количества строк (9 для ввода), запустите приложение Spark. со свойством конфигурации spark.sql.shuffle.partitions как 1.

Это сделает вычисления с 1 разделом после агрегации и всеми результирующими строками в одном разделе.

...