Мой вопрос : Почему Spark вычисляет sum
и count
для каждого раздела, выполняет ненужное (IMHO) перемешивание (Exchange hashpartitioning
), а затем вычисляет среднее значение в HashAggregate
?
Что можно было сделать : Рассчитать среднее значение для каждого раздела и затем объединить (объединить) результаты.
подробности:
Я читаю данные из таблицы Hive, определенной ниже, которая разбита по дате.
spark.sql("""Create External Table If Not Exists daily_temp.daily_temp_2014
(
state_name string,
...
) Partitioned By (
date_local string
)
Location "./daily_temp/"
Stored As ORC""")
Он состоит из ежедневного измерения температуры в различных точках США, загруженного с веб-сайта EPA .
Используя приведенный ниже код, данные загружаются из таблицы Hive в PySpark DataFrame:
spark = (
SparkSession.builder
.master("local")
.appName("Hive Partition Test")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate()
)
my_df = spark.sql("select * from daily_temp.daily_temp_2014")
Я бы хотел рассчитать среднесуточную температуру для каждого штата.
daily_state_mean = (
my_df
.groupBy(
my_df.date_local,
my_df.state_name
)
.agg({"arithmetic_mean":"mean"})
)
И это часть физического (исполнительного) плана:
+- *(2) HashAggregate(keys=[date_local#3003, state_name#2998], functions=[avg(cast(arithmetic_mean#2990 as double))], output=[date_local#3003, state_name#2998, avg(CAST(arithmetic_mean AS DOUBLE))#3014])
+- Exchange hashpartitioning(date_local#3003, state_name#2998, 365)
+- *(1) HashAggregate(keys=[date_local#3003, state_name#2998], functions=[partial_avg(cast(arithmetic_mean#2990 as double))], output=[date_local#3003, state_name#2998, sum#3021, count#3022L])
+- HiveTableScan [arithmetic_mean#2990, state_name#2998, date_local#3003], HiveTableRelation `daily_temp`.`daily_temp_2014`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [...], [date_local#3003]
Ваши советы и идеи высоко ценятся.