Сочетание встроенных функций , udf
и функции window
должно помочь вам получить желаемый результат ( прокомментировано для ясности )
import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("Account").orderBy("Sequence").rowsBetween(Long.MinValue, Long.MaxValue)
import org.apache.spark.sql.functions._
def filterUdf = udf((array:Seq[Long], sequence: Long)=> !array.contains(sequence))
df.withColumn("collection", sum(when(col("Type") === "Cancellation", 1).otherwise(0)).over(windowSpec)) //getting the count of cancellation in each group
.withColumn("Sequence", when(col("Type") === "Cancellation", col("Sequence")-col("collection")).otherwise(col("Sequence"))) //getting the difference between count and sequence number to get the sequence number of previous
.withColumn("collection", collect_set(when(col("Type") === "Cancellation", col("Sequence")).otherwise(0)).over(windowSpec)) //collecting the differenced sequence number of cancellation
.filter(filterUdf(col("collection"), col("Sequence"))) //filtering out the rows calling the udf
.drop("collection")
.show(false)
, который должен дать вам
+-------+-----------+--------+
|Account|Type |Sequence|
+-------+-----------+--------+
|11047 |Aggregation|11 |
|1030583|Aggregation|1 |
|1030583|Aggregation|4 |
+-------+-----------+--------+
Примечание. Это решение работает только при последовательном аннулировании в каждой группе Account