Это пример того, как использовать UDAF. Очевидно, вам не нужен UDAF для объединения значений столбцов по id, но это позволяет добавить больше логики. Например, чтобы объединить значения по полю ID, вы можете создать UDAF, например:
class ConcatenateStrings extends UserDefinedAggregateFunction {
override def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)
override def bufferSchema: StructType = StructType(StructField("pair", StringType) :: Nil)
override def dataType: DataType = StringType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val b = buffer.getAs[String](0)
val i = input.getAs[String](0)
buffer(0) = { if(b.isEmpty) b + i else b + " + " + i }
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val b1 = buffer1.getAs[String](0)
val b2 = buffer2.getAs[String](0)
if(!b1.isEmpty)
buffer1(0) = (b1) ++ " + " ++ (b2)
else
buffer1(0) = b2
}
override def evaluate(buffer: Row): Any = {
val yourString = buffer.getAs[String](0)
// Compute your logic and return another String
yourString + "@procesed"
}
}
Затем вы можете включить в свой вызов агрегации:
object testAppl0 {
def main(args: Array[String]) : Unit = {
val agg0 = new ConcatenateStrings()
implicit val spark: SparkSession =
SparkSession
.builder()
.appName("Test")
.master("local[1]")
.getOrCreate()
import spark.implicits._
val rows = Seq(Row(31898,"CP Bill Payment"), Row(31898,"CP e-Transfer + CP IMT"), Row(31898,"CPS Limit + CPS Payee "))
val schema = List(
StructField("ID", IntegerType, true),
StructField("Category", StringType, true))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(rows),
StructType(schema)
)
df.groupBy("ID").agg(agg0($"Category")).show(false)
}
}
Будет возвращен новый столбец "concatenatestrings (Category)":
+-----+--------------------------------------------------------------------------+
|ID |concatenatestrings(Category) |
+-----+--------------------------------------------------------------------------+
|31898|CP Bill Payment + CP e-Transfer + CP IMT + CPS Limit + CPS Payee @procesed|
+-----+--------------------------------------------------------------------------+
Проверьте это, может быть, это может помочь