Мне было трудно выразить это с помощью Spark SQL, но я управлялся с помощью функционального программирования с использованием Dataset API
scala> case class Food(category: String, a: Int, b: Option[Int] = None)
defined class Food
scala> val ds = spark.createDataset(
| List(
| Food("Fruit", 1),
| Food("Fruit", 5),
| Food("Fruit", 3),
| Food("Fruit", 4),
| Food("Dessert", 4),
| Food("Dessert", 2),
| Food("Veggies", 11),
| Food("Veggies", 7),
| Food("Veggies", 12),
| Food("Veggies", 3)
| )
| )
ds: org.apache.spark.sql.Dataset[Food] = [category: string, a: int ... 1 more field]
scala> ds.show
+--------+---+----+
|category| a| b|
+--------+---+----+
| Fruit| 1|null|
| Fruit| 5|null|
| Fruit| 3|null|
| Fruit| 4|null|
| Dessert| 4|null|
| Dessert| 2|null|
| Veggies| 11|null|
| Veggies| 7|null|
| Veggies| 12|null|
| Veggies| 3|null|
+--------+---+----+
scala> :paste
// Entering paste mode (ctrl-D to finish)
ds.groupByKey(_.category)
.flatMapGroups { (key, iter) =>
if (iter.hasNext) {
val head = iter.next
iter.scanLeft(head.copy(b = Some(head.a))) { (x, y) =>
val a = x.b.map(b => if(x.a > b) x.a else b).getOrElse(x.a)
y.copy(b = if(y.a > a) Some(y.a) else Some(a))
}
} else iter
}
.show
// Exiting paste mode, now interpreting.
+--------+---+---+
|category| a| b|
+--------+---+---+
| Veggies| 11| 11|
| Veggies| 7| 11|
| Veggies| 12| 12|
| Veggies| 3| 12|
| Dessert| 4| 4|
| Dessert| 2| 4|
| Fruit| 1| 1|
| Fruit| 5| 5|
| Fruit| 3| 5|
| Fruit| 4| 5|
+--------+---+---+