Я думаю, что самый эффективный способ - это выполнить агрегацию, а затем создать новый фрейм данных.Таким образом вы избежите дорогостоящего explode
.
Сначала давайте создадим фрейм данных.Кстати, всегда приятно предоставить код, чтобы сделать это, когда вы задаете вопрос.Таким образом, мы можем воспроизвести вашу проблему за считанные секунды.
val df = Seq((1, 1, 0, 0, 1), (1, 1, 5, 0, 0),
(0, 1, 0, 6, 0), (0, 1, 0, 4, 3))
.toDF("output_label", "ID", "C1", "C2", "C3")
Затем мы создаем список интересующих нас столбцов, агрегирование и вычисляем результат.
val cols = (1 to 3).map(i => s"C$i")
val aggs = cols.map(name => sum(col(name)).as(name))
val agg_df = df.agg(aggs.head, aggs.tail :_*) // See the note below
agg_df.show
+---+---+---+
| C1| C2| C3|
+---+---+---+
| 5| 10| 4|
+---+---+---+
МыПочти у нас есть все, что нам нужно, нам просто нужно собрать данные и построить новый фрейм данных:
val agg_row = agg_df.first
cols.map(name => name -> agg_row.getAs[Long](name))
.toDF("column", "sum")
.show
+------+---+
|column|sum|
+------+---+
| C1| 5|
| C2| 10|
| C3| 4|
+------+---+
РЕДАКТИРОВАТЬ:
NB: df.agg(aggs.head, aggs.tail :_*)
может показаться странным.Идея состоит в том, чтобы просто вычислить все агрегаты, вычисленные в aggs
.Можно ожидать чего-то более простого, например df.agg(aggs : _*)
.Тем не менее, подпись метода agg
выглядит следующим образом:
def agg(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)
возможно, чтобы гарантировать использование хотя бы одного столбца, и поэтому вам нужно разделить aggs
на aggs.head
и aggs.tail
.