Использование Spark scala sql
val df = Seq((1,1),(1,2),(1,3),(2,4),(3,5),(4,6),(4,7),(4,8)).toDF("product_id", "review_id")
df.createOrReplaceTempView("review")
spark.sql(
""" with t1( select product_id, review_id , count(*) over(partition by product_id) c1 from review)
select product_id, review_id from t1 where c1 >=3
""").show(false)
Результаты:
+----------+---------+
|product_id|review_id|
+----------+---------+
|1 |1 |
|1 |2 |
|1 |3 |
|4 |6 |
|4 |7 |
|4 |8 |
+----------+---------+
Использование функций df для получения одинаковых результатов
import org.apache.spark.sql.expressions.Window
val df = Seq((1,1),(1,2),(1,3),(2,4),(3,5),(4,6),(4,7),(4,8)).toDF("product_id", "review_id")
df.withColumn("cn",count('product_id).over(Window.partitionBy('product_id))).filter(" cn>=3 ").drop("cn").show(false)