В приведенном ниже коде не удалось захватить записи с нулевым значением.Снизу df1, столбец НЕТ.5 имеет нулевое значение (поле имени).
В соответствии с моим нижеприведенным требованием OutputDF, запись № 5 должна идти, как указано.Но после выполнения кода ниже эта запись не поступает в окончательный вывод.Записи с нулевыми значениями не поступают в вывод.Кроме этого, остальное все нормально.
df1
NO DEPT NAME SAL
1 IT RAM 1000
2 IT SRI 600
3 HR GOPI 1500
5 HW 700
df2
NO DEPT NAME SAL
1 IT RAM 1000
2 IT SRI 900
4 MT SUMP 1200
5 HW MAHI 700
OutputDF
NO DEPT NAME SAL FLAG
1 IT RAM 1000 SAME
2 IT SRI 900 UPDATE
4 MT SUMP 1200 INSERT
3 HR GOPI 1500 DELETE
5 HW MAHI 700 UPDATE
from pyspark.shell import spark
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
sc = spark.sparkContext
filedf1 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file1.csv")
filedf2 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file2.csv")
filedf1.createOrReplaceTempView("table1")
filedf2.createOrReplaceTempView("table2")
df1 = spark.sql( "select * from table1" )
df2 = spark.sql( "select * from table2" )
#DELETE
df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('DELETE').alias('FLAG'))
print("df_d left:",df_d.show())
#INSERT
df_i = df1.join(df2, df1.NO == df2.NO, 'right').filter(F.isnull(df1.NO)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('INSERT').alias('FLAG'))
print("df_i right:",df_i.show())
#SAME
df_s = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) == F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('SAME').alias('FLAG'))
print("df_s inner:",df_s.show())
#UPDATE
df_u = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) != F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('UPDATE').alias('FLAG'))
print("df_u inner:",df_u.show())
df = df_d.union(df_i).union(df_s).union(df_u)
df.show()
Здесь я сравниваю и df1, и df2, если обнаружены новые записи в df2, принимая флаг как INSERT, если запись одинакова в обоих dfs, затем принимает как SAME, если запись находится в df1 ине в df2, принимая в качестве DELETE, и если запись существует в обоих dfs, но с другими значениями, тогда принимает значения df2 в качестве UPDATE.