Полагаю, вы пытаетесь достичь того же результата, что и ваш другой вопрос . Ваша логика кажется идеальной, но вы допустили определенные ошибки. Итак, начнем с ваших ошибок
Ошибка 1
, если вы определите набор данных как
val foos = Seq(Foo(-18, "Z"),
Foo(-11, "G"),
Foo(-8, "A"),
Foo(-4, "C"),
Foo(-1,"F")).toDS()
Вы получите набор данных с схемой как
+---+---+
|a |b |
+---+---+
|-18|Z |
|-11|G |
|-8 |A |
|-4 |C |
|-1 |F |
+---+---+
root
|-- a: integer (nullable = false)
|-- b: string (nullable = true)
Как вы можете видеть, Объект структуры Foo уже разделен на разные столбцы (это не то, что вам нужно)
Ошибка 2
Вы определили вашу udf
функцию как
def slidingUdf = udf((list1: Seq[Foo] )=> {...
невозможно использовать пользовательские типы данных во входных аргументах функций udf . Вы можете просто использовать типы данных sql только
Ошибка 3
$"*"
не собирает все значения столбцов в диапазоне, который вы определили в оконной функции . Вы должны использовать функцию, либо встроенную, либо пользовательскую
Есть и другие мелкие ошибки .
Теперь давайте исправим ошибки
решено 1
Вы должны определить набор данных как
val foos = Seq(
(Foo(-18, "Z"), "test"),
(Foo(-11, "G"), "test"),
(Foo(-8, "A"), "test"),
(Foo(-4, "C"), "test"),
(Foo(-1,"F"), "test"))
.toDF("foo", "test")
что должно дать вам
+--------+----+
|foo |test|
+--------+----+
|[-18, Z]|test|
|[-11, G]|test|
|[-8, A] |test|
|[-4, C] |test|
|[-1, F] |test|
+--------+----+
root
|-- foo: struct (nullable = true)
| |-- a: integer (nullable = false)
| |-- b: string (nullable = true)
|-- test: string (nullable = true)
Здесь test
столбец равен фиктивный столбец , которым можно пренебречь, но был необходим для создания набора данных
решено 2
рабочая udf
функция должна быть такой, как показано ниже
def slidingUdf = udf((list1: Seq[Row] )=> {
if(list1.size < 3) {
null
}
else {
FooResult(list1(0).getAs[Int]("a"), list1(0).getAs[String]("b"), list1(1).getAs[Int]("a"), list1(1).getAs[String]("b"), list1(2).getAs[Int]("a"), list1(2).getAs[String]("b"), list1(0).getAs[Int]("a") + list1(1).getAs[Int]("a"), list1(0).getAs[String]("b") + list1(1).getAs[String]("b") + list1(2).getAs[String]("b"))
}
})
Я использовал Row
, поскольку - это тип данных для struct
столбцов, а также посмотрите, как данные извлекаются из Row
решено 3
Вы должны использовать collect_list
встроенную функцию с window
функцией как
foos.select(slidingUdf(collect_list("foo").over(sliding_window_spec)).as("test"))
.filter(col("test").isNotNull)
.select(col("test.*"))
.show(false)
Итак, общий рабочий код
val foos = Seq(
(Foo(-18, "Z"), "test"),
(Foo(-11, "G"), "test"),
(Foo(-8, "A"), "test"),
(Foo(-4, "C"), "test"),
(Foo(-1,"F"), "test"))
.toDF("foo", "test")
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
// work on 3 rows
val sliding_window_spec = Window.orderBy(desc("foo.a")).rowsBetween( -2, 0)
def slidingUdf = udf((list1: Seq[Row] )=> {
if(list1.size < 3) {
null
}
else {
FooResult(list1(0).getAs[Int]("a"), list1(0).getAs[String]("b"), list1(1).getAs[Int]("a"), list1(1).getAs[String]("b"), list1(2).getAs[Int]("a"), list1(2).getAs[String]("b"), list1(0).getAs[Int]("a") + list1(1).getAs[Int]("a"), list1(0).getAs[String]("b") + list1(1).getAs[String]("b") + list1(2).getAs[String]("b"))
}
})
foos.select(slidingUdf(collect_list("foo").over(sliding_window_spec)).as("test"))
.filter(col("test").isNotNull)
.select(col("test.*"))
.show(false)
что должно дать вам
+---+---+---+---+---+---+------------+------------+
|a1 |b1 |a2 |b2 |a3 |b3 |computation1|computation2|
+---+---+---+---+---+---+------------+------------+
|-1 |F |-4 |C |-8 |A |-5 |FCA |
|-4 |C |-8 |A |-11|G |-12 |CAG |
|-8 |A |-11|G |-18|Z |-19 |AGZ |
+---+---+---+---+---+---+------------+------------+
Вы можете поиграть для большего обучения и понимания. Надеюсь, ответ более чем полезен;)