Проверьте код ниже.
scala> val df = Seq(("Srinivas",1),("Ravi",2),("Srinivas",1),("Srinivas",1),("Kumar",3),("Srinivas",1),("Srinivas",1),("Srinivas",1),("Srinivas",1)).toDF("name","age")
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> df.orderBy($"name").show(false) // 7 duplicate names.
+--------+---+
|name |age|
+--------+---+
|Kumar |3 |
|Ravi |2 |
|Srinivas|1 |
|Srinivas|1 |
|Srinivas|1 |
|Srinivas|1 |
|Srinivas|1 |
|Srinivas|1 |
|Srinivas|1 |
+--------+---+
scala> val keepOnly = 4
keepOnly: Int = 4
scala> val duplicates = df.withColumn("rowid",row_number().over(Window.partitionBy($"name",$"age").orderBy($"name",$"age"))).withColumn("rowid",when($"rowid" > keepOnly,lit(1)).otherwise($"rowid"))
duplicates: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> duplicates.show(false)
+--------+---+-----+
|name |age|rowid|
+--------+---+-----+
|Srinivas|1 |1 |
|Srinivas|1 |2 |
|Srinivas|1 |3 |
|Srinivas|1 |4 |
|Srinivas|1 |1 |
|Srinivas|1 |1 |
|Srinivas|1 |1 |
|Kumar |3 |1 |
|Ravi |2 |1 |
+--------+---+-----+
scala> duplicates.drop
drop dropDuplicates
scala> duplicates.dropDuplicates.drop("rowid").show(false)
+--------+---+
|name |age|
+--------+---+
|Srinivas|1 |
|Srinivas|1 |
|Srinivas|1 |
|Srinivas|1 |
|Kumar |3 |
|Ravi |2 |
+--------+---+