Если вам нужно обрабатывать операции на основе окна Spark без использования функций spark Window и Spark sql, вы можете сделать это с помощью UDAF. Работа с UDAF и UDF - это то, что в некоторых блогах не рекомендуется использовать, если в этом нет необходимости. Но если вы можете позволить себе потерять некоторую производительность и, возможно, большие паузы в GC, вы можете попытаться поиграть с вашими собственными преобразованиями / агрегациями Spark.
Пример:
предположим, что вы хотите выполнить, и некоторое окно слайдов в вашем наборе данных, которое может быть представлено как:
item: String, key: String, timestamp: Long, field1:String, field2:Int, field3:Int, field4:Int
И вы хотите, например, реализовать приращение field2 в качестве нового поля вашего фрейма данных, и вы хотите сделать это без использования Spark sql и вам нужно использовать систему типов Scala, например, вы хотите выполнить операцию между двумя строками, используя экземпляр Monoid . Возможно, в этом случае лучше работать напрямую с RDD ... Ниже приведен пример работы с Dataframe api.
Одновременная работа с фреймами данных и типами Scala несколько обременительна, поскольку приходится иметь дело с обоими типами семейств:
Вы должны реализовать абстрактные члены UDAF:
class GenericAggregate(id: StringType, in: IntegerType, sort: LongType, output:IntegerType)(f: Seq[(String, Int)] => Seq[(String, Int)]) {
private val mapType = MapType(id, MapType(sort, output, true), true)
// This is the schema for your UDAF. The aggregation needs three fields from the input dataframe
override def inputSchema: StructType =
StructType(
StructField("id_schema0", id) :: StructField("sort_schema0", sort) :: StructField(
"input_schema0",
in) :: Nil)
// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType =
StructType(StructField("internal_buffer", mapType) :: Nil)
}
// This kind of aggregation returns a key-value: key -> delta
override def dataType: DataType = MapType(id, output)
override def deterministic: Boolean = true
// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map("" -> 0)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Map[String, Map[Long, Int]]](0) + (input.getAs[String](0) -> Map(
input.getAs[Long](1) -> input.getAs[Int](2)))
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Map[String, Map[Long, Int]]](0) ++ buffer2
.getAs[Map[String, Map[Long, Int]]](0)
}
override def evaluate(buffer: Row): Any = {
val map = buffer.getAs[Map[String, Map[Long, Int]]](0)
// You need to create a Seq from the map
val toSeq0 = map.mapValues(_.head)
// As window functions you must order your events before applying the function
val toSeq1 = toSeq0.toSeq.sortBy(_._2._1)
val sequence = toSeq1.map(el => (el._1, el._2._2))
/*
For example, if your internal map is val in =
Map("ke1" -> Map(1L -> 3), "key2" -> Map(2L -> 3))
you will get ArrayBuffer((ke1,3), (key2,3))
*/
val result = f(sequence)
/*
As a result you will have another ArrayBuffer with your new data
Map("ke1" -> Map(1L -> 1), "key2" -> Map(2L -> 1))
*/
result.toMap - k.initKey
}
}
В этом примере карта используется для создания агрегации, но вам необходимо предоставить функцию, которая работает с этой коллекцией и возвращает новое поле.
Почему это необходимо? Ну, в случае, если вам нужно абстрагироваться или создать DSL поверх Spark Sql с более сложными типами, вы можете использовать деривацию классов типов для создания пользовательских оконных / агрегатных функций для ваших продуктов. Но, как я уже говорил, работа напрямую с функциями Spark SQL настоятельно рекомендуется в большинстве случаев . Эти альтернативы помогают лучше понять, как работает Spark, и дают возможность поиграть с компилятором, чтобы построить более общие конвейеры данных или даже создать DSL, которые могут быть выполнены через Spark Sql.
Надеюсь, это поможет.