Сохранение не более 10 дубликатов в наборе данных - PullRequest
1 голос
/ 07 мая 2020

Я экспериментировал с большим набором данных с помощью Spark. Одна из основных проблем с моими данными заключается в том, что у них есть дубликаты, и я хочу их удалить. Но мой случай удаления немного отличается, так как я хотел бы сохранить не более 10 дубликатов в моих данных и удалить все остальное.

Я экспериментировал с функцией .dropDuplicates() только для того, чтобы выяснить, что она удаляет все дубликаты из набора данных. Здесь приветствуются любая помощь или указатели.

Ответы [ 2 ]

1 голос
/ 07 мая 2020

Вы можете использовать Window для разделения данных по всем столбцам. Каждый раздел будет содержать все одинаковые (т. Е. Дублированные) строки.

Затем мы можем использовать row_number() для нумерации строк в каждом разделе и удаления тех, которые находятся выше порогового значения. Это можно сделать следующим образом (в Scala):

val maxDuplicate = 10

// partiton over all columns, ordering by one of them
val w = Window.partitionBy(df.columns.head, df.columns.tail: _*).orderBy(df.columns.head)

df.withColumn("rowNum", row_number().over(w))
  .filter($"rowNum" <= maxDuplicate)
  .drop("rowNum")
1 голос
/ 07 мая 2020

Проверьте код ниже.

scala> val df = Seq(("Srinivas",1),("Ravi",2),("Srinivas",1),("Srinivas",1),("Kumar",3),("Srinivas",1),("Srinivas",1),("Srinivas",1),("Srinivas",1)).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df.orderBy($"name").show(false) // 7 duplicate names.
+--------+---+
|name    |age|
+--------+---+
|Kumar   |3  |
|Ravi    |2  |
|Srinivas|1  |
|Srinivas|1  |
|Srinivas|1  |
|Srinivas|1  |
|Srinivas|1  |
|Srinivas|1  |
|Srinivas|1  |
+--------+---+


scala> val keepOnly = 4
keepOnly: Int = 4

scala> val duplicates = df.withColumn("rowid",row_number().over(Window.partitionBy($"name",$"age").orderBy($"name",$"age"))).withColumn("rowid",when($"rowid" > keepOnly,lit(1)).otherwise($"rowid"))
duplicates: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]

scala> duplicates.show(false)
+--------+---+-----+
|name    |age|rowid|
+--------+---+-----+
|Srinivas|1  |1    |
|Srinivas|1  |2    |
|Srinivas|1  |3    |
|Srinivas|1  |4    |
|Srinivas|1  |1    |
|Srinivas|1  |1    |
|Srinivas|1  |1    |
|Kumar   |3  |1    |
|Ravi    |2  |1    |
+--------+---+-----+


scala> duplicates.drop
drop   dropDuplicates

scala> duplicates.dropDuplicates.drop("rowid").show(false)
+--------+---+
|name    |age|
+--------+---+
|Srinivas|1  |
|Srinivas|1  |
|Srinivas|1  |
|Srinivas|1  |
|Kumar   |3  |
|Ravi    |2  |
+--------+---+

...