Я думаю, для вашей проблемы нет ярлыка. Пожалуйста, найдите мое решение ниже
//Inputs:
val df1=Seq((2014,"CT",343477),(2014,"DE",123431),(2014,"MD",558686),(2014,"NJ",773321),(2015,"CT",343477),(2015,"DE",123431),(2015,"MD",558686),(2015,"NJ",773321)).toDF("year","state","count")
val df2=Seq(343477,123431,558686,773321,343477,123431,558686,773321).toDF("count_2")
//Solution:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val winFun=Window.partitionBy("year","state","count").orderBy("year")
df1.join(df2,df1("count")===df2("count_2")).withColumn("row_no",row_number over winFun).filter("row_no =1").drop("row_no").orderBy("year").show
Пример вывода:
+----+-----+------+-------+
|year|state| count|count_2|
+----+-----+------+-------+
|2014| DE|123431| 123431|
|2014| MD|558686| 558686|
|2014| CT|343477| 343477|
|2014| NJ|773321| 773321|
|2015| MD|558686| 558686|
|2015| DE|123431| 123431|
|2015| CT|343477| 343477|
|2015| NJ|773321| 773321|
+----+-----+------+-------+