Я пытался решить эту проблему последние три дня и, наконец, смог обойти ее, используя UserDefinedAggregateFunction
.Вот пример кода для того же
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
class CustomAggregator extends UserDefinedAggregateFunction {
override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(Array(StructField("key", StringType), StructField("value", ArrayType(StringType))))
// This is the internal fields you keep for computing your aggregate
override def bufferSchema: StructType = StructType(
Array(StructField("mapData", MapType(StringType, ArrayType(StringType))))
)
// This is the output type of your aggregatation function.
override def dataType = StringType
override def deterministic: Boolean = true
// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = scala.collection.mutable.Map[String, String]()
}
// This is how to update your buffer schema given an input.
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getMap(0) + (input.getAs[String](0) -> input.getAs[String](1))
}
// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getAs[Map[String, Any]](0) ++ buffer2.getAs[Map[String, Any]](0))
}
// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
val map = buffer(0).asInstanceOf[Map[Any, Any]]
val buff: ListBuffer[String] = ListBuffer()
for ((k, v) <- map) {
val valArray = v.asInstanceOf[mutable.WrappedArray[Any]].array;
val tmp = {
for {
valString <- valArray
} yield valString.toString
}.toList.mkString(",")
buff += "\"" + k.toString + "\":[" + tmp + "]"
}
"{" + buff.toList.mkString(",") + "}"
}
}
Теперь используйте customAggregator,
val ca = new CustomAggregator
val df5 = df4.groupBy("id").agg(ca(col("executor_id"), col("data")).as("jsonData"))
Результирующий DF равен
scala> df5.show(false)
+---+-----------------------------------------------------------------------+
|id |jsonData |
+---+-----------------------------------------------------------------------+
|1 |{"e1":[{"number":"n1","data":"d1"}],"e2":[{"number":"n1","data":"d1"}]}|
+---+-----------------------------------------------------------------------+
Даже если я решил эту проблему, яне уверен, правильно ли это или нет.Причины моих сомнений:
- Местами я использовал
Any
.Я не думаю, что это правильно. - Для каждой оценки я создаю ListBuffer и многие другие типы данных.Я не уверен насчет производительности кода.
- Мне все еще приходится тестировать код для многих типов данных, таких как double, дата tpye, вложенный json и т. Д. В качестве данных.