Функция collect_list
в Spark позволяет объединять оконные значения в виде списка.Этот список может быть передан udf
для выполнения некоторых сложных вычислений
Так что, если у вас есть источник
val data = List(
("XSC", "1986-05-21", 44.7530),
("XSC", "1986-05-22", 44.7530),
("XSC", "1986-05-23", 23.5678),
("TM", "1982-03-08", 22.2734),
("TM", "1982-03-09", 22.1941),
("TM", "1982-03-10", 22.0847),
("TM", "1982-03-11", 22.1741),
("TM", "1982-03-12", 22.1840),
("TM", "1982-03-15", 22.1344),
).toDF("id", "timestamp", "feature")
.withColumn("timestamp", to_date('timestamp))
И некоторая сложная функция, заключенная в UDF вашей записи (представленанапример, кортеж)
val complexComputationUDF = udf((list: Seq[Row]) => {
list
.map(row => (row.getString(0), row.getDate(1).getTime, row.getDouble(2)))
.sortBy(-_._2)
.foldLeft(0.0) {
case (acc, (id, timestamp, feature)) => acc + feature
}
})
Вы можете определить либо окно, которое передает все секционированные данные в каждую запись, либо, в случае упорядоченного окна, инкрементные данные в каждую запись
val windowAll = Window.partitionBy("id")
val windowRunning = Window.partitionBy("id").orderBy("timestamp")
И объедините все это в новый набор данных, например:
val newData = data
// I assuming thatyou need id,timestamp & feature for the complex computattion. So I create a struct
.withColumn("record", struct('id, 'timestamp, 'feature))
// Collect all records in the partition as a list of tuples and pass them to the complexComupation
.withColumn("computedValueAll",
complexComupationUDF(collect_list('record).over(windowAll)))
// Collect records in a time ordered windows in the partition as a list of tuples and pass them to the complexComupation
.withColumn("computedValueRunning",
complexComupationUDF(collect_list('record).over(windowRunning)))
Это приведет к чему-то вроде:
+---+----------+-------+--------------------------+------------------+--------------------+
|id |timestamp |feature|record |computedValueAll |computedValueRunning|
+---+----------+-------+--------------------------+------------------+--------------------+
|XSC|1986-05-21|44.753 |[XSC, 1986-05-21, 44.753] |113.07379999999999|44.753 |
|XSC|1986-05-22|44.753 |[XSC, 1986-05-22, 44.753] |113.07379999999999|89.506 |
|XSC|1986-05-23|23.5678|[XSC, 1986-05-23, 23.5678]|113.07379999999999|113.07379999999999 |
|TM |1982-03-08|22.2734|[TM, 1982-03-08, 22.2734] |133.0447 |22.2734 |
|TM |1982-03-09|22.1941|[TM, 1982-03-09, 22.1941] |133.0447 |44.4675 |
|TM |1982-03-10|22.0847|[TM, 1982-03-10, 22.0847] |133.0447 |66.5522 |
|TM |1982-03-11|22.1741|[TM, 1982-03-11, 22.1741] |133.0447 |88.7263 |
|TM |1982-03-12|22.184 |[TM, 1982-03-12, 22.184] |133.0447 |110.91029999999999 |
|TM |1982-03-15|22.1344|[TM, 1982-03-15, 22.1344] |133.0447 |133.0447 |
+---+----------+-------+--------------------------+------------------+--------------------+