Благодаря моему исследованию у меня есть два решения этой проблемы.
Methed 1: изменение исходного кода с помощью SPARK-1893 , но я не рекомендую делать это.
Methed 2: Создание пользовательских агрегатных функций (UDAF) для себя. Хотя это хлопотно, но эффективно. Ниже приведен мой код, добро пожаловать, правильно!
class CollectList extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("id", StringType, nullable = true) :: StructField("state", StringType, nullable = true):: Nil)
override def bufferSchema: StructType = StructType(StructField("ids", ArrayType(StringType, containsNull = true), nullable = true) :: Nil)
override def dataType: ArrayType = ArrayType(StringType, containsNull = true)
override def deterministic: Boolean = false
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = null
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.get(0) == null){
buffer(0) = Array(input.getString(0) + "_" + input.getString(1))
}
else {
val s = input.getString(0) + "_" + input.getString(1)
val b = buffer.getAs[Seq[String]](0)
buffer(0) = b :+ s
}
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
if (buffer1.getAs[Seq[String]](0) == null){
buffer1(0) = buffer2.getAs[Seq[String]](0).distinct
}
else {
buffer1(0) = (buffer1.getAs[Seq[String]](0) ++ buffer2.getAs[Seq[String]](0)).distinct
}
}
override def evaluate(buffer: Row): Any = buffer.getAs[Seq[String]](0)
}