Spark Scala: несколько запросов к одной и той же таблице - PullRequest
0 голосов
/ 17 сентября 2018

Я пытаюсь запросить несколько столбцов из одной таблицы (bigTable), чтобы сгенерировать несколько агрегированных столбцов (column1_sum, column2_sum, column3_count). В конце я объединяю все столбцы в одну таблицу.

Код ниже

val t1 = bigTable
            .filter($"column10" === value1)
            .groupBy("key1","key2")
            .agg(sum("column1") as "column1_sum")

val t2 = bigTable
            .filter($"column11"===1)
            .filter($"column10" === value1)
            .groupBy("key1","key2")
            .agg(sum("column2") as "column2_sum")

val t3 = bigTable
            .filter($"column10" === value3)
            .groupBy("key1","key2")
            .agg(countDistinct("column3") as "column3_count")

tAll
            .join(t1,Seq("key1","key2"),"left_outer")
            .join(t2,Seq("key1","key2"),"left_outer")
            .join(t3,Seq("key1","key2"),"left_outer")

Проблемы с указанным кодом

bigTable - это огромная таблица (она состоит из миллионов строк). Таким образом, многократные запросы не эффективны. Выполнение запроса занимает много времени.

Любые идеи о том, как я мог бы добиться того же результата более эффективным способом? Есть ли способ запросить bigTable меньшее количество раз?

Заранее большое спасибо.

Ответы [ 2 ]

0 голосов
/ 18 сентября 2018

Одним из основных улучшений в моем коде будет один раз запросить bigTable, а не несколько раз, как указано в вопросе.

Кусок кода, который я пробую (мой код похожэто просто иллюстрация):

bigTable
    .filter($"column10" === value1)
    .groupBy("key1", "key2")
    .agg(
      sum("column1") as "column1_sum",
      sum("column2") as "column2_sum",
      countDistinct(when($"column11"===1, col("column3"))) as "column3_count"
)
0 голосов
/ 17 сентября 2018

Самое простое улучшение состоит в том, чтобы выполнять только как одиночное агрегирование, где предикат помещается в блок CASE ... WHEN ..., и заменять countDistinct приближенным эквивалентом

tAll
  .groupBy("key1","key2")
  .agg(
    sum(
      when($"column10" === "value1", $"column1")
    ).as("column1_sum"),
    sum(
      when($"column10" === "value1" and $"column11" === 1, $"column2")
    ).as("column2_sum"),
    approx_count_distinct(
      when($"column10" === "value3", $"column3")
    ).as("column3_count"))
  .join(tAll, Seq("key1", "key2"), "right_outer"))

В зависимости от используемых функций иКроме знаний о распределении данных, вы также можете попытаться заменить агрегацию на оконные функции с похожей CASE ... WHEN ... логикой

import org.apache.spark.sql.expressions.Window

val w = Window
 .partitionBy("key1", "key2")
 .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

tAll
  .withColumn(
    "column1_sum", 
    sum(when($"column10" === "value1", $"column1")).over(w))
 ...

, но это часто менее устойчивый подход.

Вам также следует подумать о группированииbigTable с использованием группирующих столбцов:

val n: Int = ???  // Number of buckets
bigTable.write.bucketBy(n, "key1", "key2").saveAsTable("big_table_clustered")

val bigTableClustered = spark.table("big_table_clustered")
...