Можно, но логика UDAF была бы другой.Например, если у вас есть две строки:
val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))
val rdd = spark.sparkContext.parallelize(seq)
Здесь схема имеет вид
root
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
, тогда, если вы попытаетесь вызвать агрегацию:
val df = seq.toDF
df.agg(agg0(col("cars")))
YouВы должны изменить свою схему ввода UDAF, например:
val carsSchema =
StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))
, и в мальчике вашего UDAF вы должны иметь дело с этой схемой, изменяя inputSchema:
override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)
В вашем методе обновления вы должны иметь делос форматом ваших строк ввода:
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val i = input.getAs[Array[Array[String]]](0)
// i here would be [car1,car2,car3], an array of strings
buffer(0) = ???
}
Отсюда вы можете преобразовать i для обновления буфера и завершения функций слияния и оценки.