Если вы хотите, чтобы результат состоял из наборов Row
с, рассмотрите возможность преобразования в RDD следующим образом:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
def df = Seq(
(BigInt(10), "x", 1.0, 2.0, Map("a"->1, "b"->2)),
(BigInt(10), "y", 3.0, 4.0, Map("c"->3)),
(BigInt(20), "z", 5.0, 6.0, Map("d"->4, "e"->5))
).
toDF("c1", "c2", "c3", "c4", "c5").
// as[(BigInt, String, Double, Double, Map[String, Int])]
df.rdd.map(r => (r.getDecimal(0), r)).groupByKey.collect
// res1: Array[(java.math.BigDecimal, Iterable[org.apache.spark.sql.Row])] = Array(
// (10,CompactBuffer([10,x,1.0,2.0,Map(a -> 1, b -> 2)], [10,y,3.0,4.0,Map(c -> 3)])),
// (20,CompactBuffer([20,z,5.0,6.0,Map(d -> 4, e -> 5)]))
// )
Или, если вы хорошо справляетесь с коллекциями строк типа struct
в DataFrame, вот альтернативный подход:
val cols = ds.columns
df.groupBy("c1").agg(collect_list(struct(cols.head, cols.tail: _*)).as("row_list")).
show(false)
// +---+----------------------------------------------------------------+
// |c1 |row_list |
// +---+----------------------------------------------------------------+
// |20 |[[20,z,5.0,6.0,Map(d -> 4, e -> 5)]] |
// |10 |[[10,x,1.0,2.0,Map(a -> 1, b -> 2)], [10,y,3.0,4.0,Map(c -> 3)]]|
// +---+----------------------------------------------------------------+