Spark Transactional удалить строки - PullRequest
0 голосов
/ 08 мая 2018

Я работаю с фреймами данных в Scala в банковском процессе, и мне нужно удалить некоторые строки, если транзакция отменяется. Например, если у меня есть отмена, я должен удалить предыдущую строку. В случае, если у меня есть три непрерывных отмены, я должен удалить 3 предыдущих строки.

Начальная датафрейма:

enter image description here

Ожидается датафрейм

enter image description here

Буду признателен за вашу помощь.

Ответы [ 2 ]

0 голосов
/ 08 мая 2018

Сочетание встроенных функций , udf и функции window должно помочь вам получить желаемый результат ( прокомментировано для ясности )

import org.apache.spark.sql.expressions._
def windowSpec = Window.partitionBy("Account").orderBy("Sequence").rowsBetween(Long.MinValue, Long.MaxValue)

import org.apache.spark.sql.functions._
def filterUdf = udf((array:Seq[Long], sequence: Long)=> !array.contains(sequence))

df.withColumn("collection", sum(when(col("Type") === "Cancellation", 1).otherwise(0)).over(windowSpec))   //getting the count of cancellation in each group
    .withColumn("Sequence", when(col("Type") === "Cancellation", col("Sequence")-col("collection")).otherwise(col("Sequence")))   //getting the difference between count and sequence number to get the sequence number of previous
    .withColumn("collection", collect_set(when(col("Type") === "Cancellation", col("Sequence")).otherwise(0)).over(windowSpec))   //collecting the differenced sequence number of cancellation 
    .filter(filterUdf(col("collection"), col("Sequence")))            //filtering out the rows calling the udf 
    .drop("collection")
  .show(false)

, который должен дать вам

+-------+-----------+--------+
|Account|Type       |Sequence|
+-------+-----------+--------+
|11047  |Aggregation|11      |
|1030583|Aggregation|1       |
|1030583|Aggregation|4       |
+-------+-----------+--------+

Примечание. Это решение работает только при последовательном аннулировании в каждой группе Account

0 голосов
/ 08 мая 2018

Я думаю, что в данном случае полезна структура данных Map of stack, ключом которой является идентификатор учетной записи. Вы помещаете строки Agg в стек до тех пор, пока не встретите Cancel, а затем выталкиваете стек.

...