Прежде всего, код, который вы показали, содержит только одну операцию, похожую на действие - DataFrameWriter.save
.Все остальные компоненты ленивы.
Но лень тут не особо помогает.Самая большая проблема (при условии отсутствия уродливого перекоса данных или неправильной настройки широковещания) заключается в том, что отдельные агрегации требуют отдельных перемешиваний и дорогостоящего последующего слияния.
Наивным решением было бы использовать следующее:
кадр данных сгруппирован в те же столбцы
, чтобы перетасовать в первую очередь:
val groupColumns: Seq[Column] = ???
val sourceDataPartitioned = sourceData.groupBy(groupColumns: _*)
и использовать результат для вычисления отдельных агрегатов
val df1 = sourceDataPartitioned
...
val df2 = sourceDataPartitioned
...
Однако этоТакой подход довольно хрупок и вряд ли будет масштабироваться при наличии больших / искаженных групп.
Поэтому было бы намного лучше переписать ваш код для выполнения только агрегации.К счастью для вас, стандартное поведение SQL - это все, что вам нужно.
Давайте начнем с структурирования вашего кода в три кортежа элементов:
_1
, являющийсяПредикат (условие, которое вы используете с filter
). _2
- список из Columns
, для которого вы хотите вычислить агрегаты. _3
- функция агрегирования.
Где пример структуры может выглядеть так:
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{count, min}
val ops: Seq[(Column, Seq[Column], Column => Column)] = Seq(
($"col1" === "a" and $"col2" === "b", Seq($"col3", $"col4"), count),
($"col2" === "b" and $"col3" === "c", Seq($"col4", $"col5"), min)
)
Теперь вы составляете агрегированные выражения, используя
agg_function(when(predicate, column))
pattern
import org.apache.spark.sql.functions.when
val exprs: Seq[Column] = ops.flatMap {
case (p, cols, f) => cols.map {
case c => f(when(p, c))
}
}
и используйте его на sourceData
sourceData.groupBy(groupColumns: _*).agg(exprs.head, exprs.tail: _*)
Добавьте aliases
при необходимости.