Я использую этот подход с Spark SQL, чтобы решить ваш вопрос
import spark.implicits._
val dataA = Seq( ("A", 20200201,"X"), ("B",20200301, "Y"))
val dataB = Seq( ("A", 10, 20191230), ("A",5, 20200310), ("B", 20, 20200220), ("B", 10, 20200130))
val dfA = sc.parallelize(dataA).toDF("product", "date", "id")
val dfB = sc.parallelize(dataB).toDF("product", "value", "date")
dfA.createOrReplaceTempView("ta")
dfB.createOrReplaceTempView("tb")
sqlContext.sql(
"""
|WITH filt AS (
|SELECT DISTINCT a.product, a.date, a.id, b.value, b.date,
|RANK() OVER(PARTITION BY a.product ORDER BY b.date DESC) AS rnk
|FROM ta AS a
|JOIN tb AS b ON(a.date != b.date)
|WHERE a.date > b.date)
|SELECT filt.product, filt.id, filt.value
| FROM filt
| WHERE filt.rnk = 1
|""".stripMargin).show(truncate = false)
с ожидаемым результатом
+-------+---+-----+
|product|id |value|
+-------+---+-----+
|B |Y |20 |
|A |X |10 |
+-------+---+-----+
Надеюсь, это может быть полезно,
Привет.