Вы можете найти мое решение ниже.Я создаю 4 кадра данных для случаев SAME / UPDATE / INSERT / DELETE, а затем объединяю их
>>> from functools import reduce
>>> from pyspark.sql import DataFrame
>>> import pyspark.sql.functions as F
>>> df1 = sc.parallelize([
... (1,'IT','RAM',1000),
... (2,'IT','SRI',600),
... (3,'HR','GOPI',1500),
... (5,'HW','MAHI',700)
... ]).toDF(['NO','DEPT','NAME','SAL'])
>>> df1.show()
+---+----+----+----+
| NO|DEPT|NAME| SAL|
+---+----+----+----+
| 1| IT| RAM|1000|
| 2| IT| SRI| 600|
| 3| HR|GOPI|1500|
| 5| HW|MAHI| 700|
+---+----+----+----+
>>> df2 = sc.parallelize([
... (1,'IT','RAM',1000),
... (2,'IT','SRI',900),
... (4,'MT','SUMP',1200),
... (5,'HW','MAHI',700)
... ]).toDF(['NO','DEPT','NAME','SAL'])
>>> df2.show()
+---+----+----+----+
| NO|DEPT|NAME| SAL|
+---+----+----+----+
| 1| IT| RAM|1000|
| 2| IT| SRI| 900|
| 4| MT|SUMP|1200|
| 5| HW|MAHI| 700|
+---+----+----+----+
#DELETE
>>> df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('D').alias('FLAG'))
#INSERT
>>> df_i = df1.join(df2, df1.NO == df2.NO, 'right').filter(F.isnull(df1.NO)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('I').alias('FLAG'))
#SAME/
>>> df_s = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) == F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).\
... select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('S').alias('FLAG'))
#UPDATE
>>> df_u = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) != F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).\
... select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('U').alias('FLAG'))
>>> dfs = [df_s,df_u,df_u,df_i]
>>> df = reduce(DataFrame.unionAll, dfs)
>>>
>>> df.show()
+---+----+----+----+----+
| NO|DEPT|NAME| SAL|FLAG|
+---+----+----+----+----+
| 5| HW|MAHI| 700| S|
| 1| IT| RAM|1000| S|
| 2| IT| SRI| 900| U|
| 2| IT| SRI| 900| U|
| 4| MT|SUMP|1200| I|
+---+----+----+----+----+