Неожиданное перемешивание при вычислении среднего значения по разделам из данных улья - PullRequest
0 голосов
/ 03 мая 2018

Мой вопрос : Почему Spark вычисляет sum и count для каждого раздела, выполняет ненужное (IMHO) перемешивание (Exchange hashpartitioning), а затем вычисляет среднее значение в HashAggregate?

Что можно было сделать : Рассчитать среднее значение для каждого раздела и затем объединить (объединить) результаты.

подробности:

Я читаю данные из таблицы Hive, определенной ниже, которая разбита по дате.

spark.sql("""Create External Table If Not Exists daily_temp.daily_temp_2014 
(
 state_name string,
 ... 
) Partitioned By (
 date_local string
)
Location "./daily_temp/" 
Stored As ORC""")

Он состоит из ежедневного измерения температуры в различных точках США, загруженного с веб-сайта EPA .

Используя приведенный ниже код, данные загружаются из таблицы Hive в PySpark DataFrame:

spark = (
    SparkSession.builder
    .master("local")
    .appName("Hive Partition Test")
    .enableHiveSupport()
    .config("hive.exec.dynamic.partition", "true")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    .getOrCreate()
)

my_df = spark.sql("select * from daily_temp.daily_temp_2014")

Я бы хотел рассчитать среднесуточную температуру для каждого штата.

daily_state_mean = (
   my_df
   .groupBy(
       my_df.date_local,
       my_df.state_name
   )
   .agg({"arithmetic_mean":"mean"})
)

И это часть физического (исполнительного) плана:

+- *(2) HashAggregate(keys=[date_local#3003, state_name#2998], functions=[avg(cast(arithmetic_mean#2990 as double))], output=[date_local#3003, state_name#2998, avg(CAST(arithmetic_mean AS DOUBLE))#3014])
  +- Exchange hashpartitioning(date_local#3003, state_name#2998, 365)
     +- *(1) HashAggregate(keys=[date_local#3003, state_name#2998], functions=[partial_avg(cast(arithmetic_mean#2990 as double))], output=[date_local#3003, state_name#2998, sum#3021, count#3022L])
        +- HiveTableScan [arithmetic_mean#2990, state_name#2998, date_local#3003], HiveTableRelation `daily_temp`.`daily_temp_2014`, org.apache.hadoop.hive.ql.io.orc.OrcSerde, [...], [date_local#3003]

Ваши советы и идеи высоко ценятся.

1 Ответ

0 голосов
/ 03 мая 2018

Здесь нет ничего неожиданного. Spark SQL не сохраняет информацию о секционировании внешнего источника (пока).

Если вы хотите оптимизировать тасование, у вас есть CLUSTER BY / bucketBy ваших данных. Если вы это сделаете, информация о разделе будет использоваться для оптимизации перемешивания.

Ссылка Как определить разбиение DataFrame?

...