Самое простое улучшение состоит в том, чтобы выполнять только как одиночное агрегирование, где предикат помещается в блок CASE ... WHEN ...
, и заменять countDistinct
приближенным эквивалентом
tAll
.groupBy("key1","key2")
.agg(
sum(
when($"column10" === "value1", $"column1")
).as("column1_sum"),
sum(
when($"column10" === "value1" and $"column11" === 1, $"column2")
).as("column2_sum"),
approx_count_distinct(
when($"column10" === "value3", $"column3")
).as("column3_count"))
.join(tAll, Seq("key1", "key2"), "right_outer"))
В зависимости от используемых функций иКроме знаний о распределении данных, вы также можете попытаться заменить агрегацию на оконные функции с похожей CASE ... WHEN ...
логикой
import org.apache.spark.sql.expressions.Window
val w = Window
.partitionBy("key1", "key2")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
tAll
.withColumn(
"column1_sum",
sum(when($"column10" === "value1", $"column1")).over(w))
...
, но это часто менее устойчивый подход.
Вам также следует подумать о группированииbigTable
с использованием группирующих столбцов:
val n: Int = ??? // Number of buckets
bigTable.write.bucketBy(n, "key1", "key2").saveAsTable("big_table_clustered")
val bigTableClustered = spark.table("big_table_clustered")