Поскольку в этот момент вы можете иметь только несколько строк, вы просто собираете атрибуты как есть и выравниваете результат (Spark> = 2,4)
import org.apache.spark.sql.functions.{collect_set, flatten, array_distinct}
val byState = Seq(
("Canada", "America", Seq("A", "B")),
("Belgium", "Europe", Seq("Z")),
("USA", "America", Seq("A")),
("France", "Europe", Seq("Y", "X"))
).toDF("country", "continent", "attributes")
byState
.groupBy("continent")
.agg(array_distinct(flatten(collect_set($"attributes"))) as "attributes")
.show
+---------+----------+
|continent|attributes|
+---------+----------+
| Europe| [Y, X, Z]|
| America| [A, B]|
+---------+----------+
В общем случае всеГораздо сложнее обрабатывать, и во многих случаях, если вы ожидаете больших списков, с большим количеством дубликатов и множеством значений в группе, оптимальным решением * будет просто пересчитать результаты с нуля, то есть
input.groupBy($"continent").agg(collect_set($"attributes") as "attributes")
Одна из возможных альтернативдолжен использовать Aggregator
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{Encoder, Encoders}
import scala.collection.mutable.{Set => MSet}
class MergeSets[T, U](f: T => Seq[U])(implicit enc: Encoder[Seq[U]]) extends
Aggregator[T, MSet[U], Seq[U]] with Serializable {
def zero = MSet.empty[U]
def reduce(acc: MSet[U], x: T) = {
for { v <- f(x) } acc.add(v)
acc
}
def merge(acc1: MSet[U], acc2: MSet[U]) = {
acc1 ++= acc2
}
def finish(acc: MSet[U]) = acc.toSeq
def bufferEncoder: Encoder[MSet[U]] = Encoders.kryo[MSet[U]]
def outputEncoder: Encoder[Seq[U]] = enc
}
и применять его следующим образом
case class CountryAggregate(
country: String, continent: String, attributes: Seq[String])
byState
.as[CountryAggregate]
.groupByKey(_.continent)
.agg(new MergeSets[CountryAggregate, String](_.attributes).toColumn)
.toDF("continent", "attributes")
.show
+---------+----------+
|continent|attributes|
+---------+----------+
| Europe| [X, Y, Z]|
| America| [B, A]|
+---------+----------+
, но это явно не дружественный для Java вариант.
См. также Как объединить значения в коллекцию после groupBy? (аналогично, но без ограничения уникальности).
* Это потому, что explode
может быть довольно дорогим, особенно в старых версиях Spark,такой же, как доступ к внешнему представлению коллекций SQL.