Ваше требование кажется простым, но реализация в spark и с сохранением неизменности является сложной задачей.Я предлагаю вам использовать рекурсивную функцию для генерации столбца steps
.Ниже я попытался предложить вам рекурсивный способ с использованием функции udf
.
import org.apache.spark.sql.functions._
//udf function to populate step column
def stepsUdf = udf((values: Seq[Row]) => {
//sorting the collected struct in reverse order according to val1 column in reverse order
val val12 = values.sortWith(_.getAs[Int]("val1") > _.getAs[Int]("val1"))
//selecting the first of sorted list
val val12Head = val12.head
//generating the first step column in the collected list
val prevStep = if(val12Head.getAs("val2") != 0) 0 else -1
//generating the first output struct
val listSteps = List(steps(val12Head.getAs("val1"), val12Head.getAs("val2"), prevStep))
//recursive function for generating the step column
def recursiveSteps(vals : List[Row], previousStep: Int, listStep : List[steps]): List[steps] = vals match {
case x :: y =>
//event changed so step column should be 0
if(x.getAs("val2") != 0) {
recursiveSteps(y, 0, listStep :+ steps(x.getAs("val1"), x.getAs("val2"), 0))
}
//event doesn't change after the last event change
else if(x.getAs("val2") == 0 && previousStep == -1) {
recursiveSteps(y, previousStep, listStep :+ steps(x.getAs("val1"), x.getAs("val2"), previousStep))
}
//val2 is 0 after the event change so increament the step column
else {
recursiveSteps(y, previousStep+1, listStep :+ steps(x.getAs("val1"), x.getAs("val2"), previousStep+1))
}
case Nil => listStep
}
//calling the recursive function
recursiveSteps(val12.tail.toList, prevStep, listSteps)
})
df
.groupBy("id") // grouping by id column
.agg(stepsUdf(collect_list(struct("val1", "val2"))).as("stepped")) //calling udf function after the collection of struct of val1 and val2
.withColumn("stepped", explode(col("stepped"))) // generating rows from the list returned from udf function
.select(col("id"), col("stepped.*")) // final desired output
.sort("id", "val1") //optional step just for viewing
.show(false)
где steps - это класс дела
case class steps(val1: Int, val2: Int, steps: Int)
, который должен дать вам
+---+----+----+-----+
|id |val1|val2|steps|
+---+----+----+-----+
|1 |1 |0 |4 |
|1 |2 |0 |3 |
|1 |3 |0 |2 |
|1 |4 |0 |1 |
|1 |5 |5 |0 |
|1 |6 |0 |3 |
|1 |7 |0 |2 |
|1 |8 |0 |1 |
|1 |9 |9 |0 |
|1 |10 |0 |-1 |
|1 |11 |0 |-1 |
|2 |1 |0 |5 |
|2 |2 |0 |4 |
|2 |3 |0 |3 |
|2 |4 |0 |2 |
|2 |5 |0 |1 |
|2 |6 |6 |0 |
|2 |7 |0 |1 |
|2 |8 |8 |0 |
|2 |9 |0 |-1 |
+---+----+----+-----+
Надеюсь, ответ полезен