Другой подход с использованием оконных функций.
scala> val dfa = Seq(("A","13"),("B","24"),("C","15")).toDF("id","topscore").withColumn("topscore",'topscore.cast("int")).withColumn("c",lit("a"))
dfa: org.apache.spark.sql.DataFrame = [id: string, topscore: int, c: string]
scala> val dfb = Seq(("A","6"), ("A","3"), ("A","18"), ("A","8"), ("B","8"), ("B","18"), ("B","26"), ("B","12"), ("C","1"), ("C","4"), ("C","20"), ("C","9")).toDF("id","score").withColumn("score",'score.cast("int")).withColumn("c",lit("b"))
dfb: org.apache.spark.sql.DataFrame = [id: string, score: int, c: string]
scala> dfa.unionAll(dfb).withColumn("x",rank().over(Window.partitionBy('c,'id) orderBy('topscore).desc )).filter('c==="b" and 'x===2).show
+---+--------+---+---+
| id|topscore| c| x|
+---+--------+---+---+
| A| 8| b| 2|
| B| 18| b| 2|
| C| 9| b| 2|
+---+--------+---+---+
scala>