Существует много подходов;это то, что может делать вещи параллельно:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val origDF = sc.parallelize(Seq(
("1", "a", "b"),
("2", "c", "d"),
("3", "e", "f")
)).toDF("k", "v1", "v2")
val newDF = sc.parallelize(Seq(
("1", "a", "b"),
("2", "c2", "d"),
("4", "g", "h")
)).toDF("k", "v1", "v2")
val df1 = origDF.except(newDF) // if k not exists in df2, then deleted
//df1.show(false)
val df2 = newDF.except(origDF) // if k not exists in df1, then added
//df2.show(false)
// if no occurrence in both dfs, then the same
// if k exists in both, then k in df2 = modified
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
val df3 = spark.sql("""SELECT df1.k, df1.v1, df1.v2, "deleted" as operation
FROM df1
WHERE NOT EXISTS (SELECT df2.k
FROM df2
WHERE df2.k = df1.k)
UNION
SELECT df2.k, df2.v1, df2.v2, "added" as operation
FROM df2
WHERE NOT EXISTS (SELECT df1.k
FROM df1
WHERE df1.k = df2.k)
UNION
SELECT df2.k, df2.v1, df2.v2, "modified" as operation
FROM df2
WHERE EXISTS (SELECT df1.k
FROM df1
WHERE df1.k = df2.k)
""")
df3.show(false)
возвращает:
+---+---+---+---------+
|k |v1 |v2 |operation|
+---+---+---+---------+
|4 |g |h |added |
|2 |c2 |d |modified |
|3 |e |f |deleted |
+---+---+---+---------+
Не так сложно, нет стандартной утилиты.