Как проверить значения NULL при сравнении 2 текстовых файлов с использованием фреймов искровых данных - PullRequest
0 голосов
/ 10 октября 2018

В приведенном ниже коде не удалось захватить записи с нулевым значением.Снизу df1, столбец НЕТ.5 имеет нулевое значение (поле имени).

В соответствии с моим нижеприведенным требованием OutputDF, запись № 5 должна идти, как указано.Но после выполнения кода ниже эта запись не поступает в окончательный вывод.Записи с нулевыми значениями не поступают в вывод.Кроме этого, остальное все нормально.

df1

NO  DEPT NAME   SAL 
1   IT  RAM     1000    
2   IT  SRI     600 
3   HR  GOPI    1500    
5   HW          700

df2

NO  DEPT NAME   SAL 
1   IT   RAM    1000    
2   IT   SRI    900 
4   MT   SUMP   1200    
5   HW   MAHI   700

OutputDF

NO  DEPT NAME    SAL   FLAG
1   IT  RAM     1000   SAME
2   IT  SRI     900    UPDATE
4   MT  SUMP    1200   INSERT
3   HR  GOPI    1500   DELETE
5   HW  MAHI    700    UPDATE

from pyspark.shell import spark
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
sc = spark.sparkContext

filedf1 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file1.csv")
filedf2 = spark.read.option("header","true").option("delimiter", ",").csv("C:\\files\\file2.csv")
filedf1.createOrReplaceTempView("table1")
filedf2.createOrReplaceTempView("table2")
df1 = spark.sql( "select * from table1" )
df2 = spark.sql( "select * from table2" )

#DELETE
df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('DELETE').alias('FLAG'))
print("df_d left:",df_d.show())
#INSERT
df_i = df1.join(df2, df1.NO == df2.NO, 'right').filter(F.isnull(df1.NO)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('INSERT').alias('FLAG'))
print("df_i right:",df_i.show())
#SAME
df_s = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) == F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df1.NO,df1.DEPT,df1.NAME,df1.SAL, F.lit('SAME').alias('FLAG'))
print("df_s inner:",df_s.show())
#UPDATE
df_u = df1.join(df2, df1.NO == df2.NO, 'inner').filter(F.concat(df2.NO,df2.DEPT,df2.NAME,df2.SAL) != F.concat(df1.NO,df1.DEPT,df1.NAME,df1.SAL)).select(df2.NO,df2.DEPT,df2.NAME,df2.SAL, F.lit('UPDATE').alias('FLAG'))
print("df_u inner:",df_u.show())

df = df_d.union(df_i).union(df_s).union(df_u)
df.show()

Здесь я сравниваю и df1, и df2, если обнаружены новые записи в df2, принимая флаг как INSERT, если запись одинакова в обоих dfs, затем принимает как SAME, если запись находится в df1 ине в df2, принимая в качестве DELETE, и если запись существует в обоих dfs, но с другими значениями, тогда принимает значения df2 в качестве UPDATE.

1 Ответ

0 голосов
/ 10 октября 2018

Есть две проблемы с кодом:

  1. Результат нулевого F.concat возвращает ноль, поэтому эта часть в коде отфильтровывает строку строки № 5:

    .filter(F.concat(df2.NO, df2.NAME, df2.SAL) != F.concat(df1.NO, df1.NAME, df1.SAL))
    
  2. Вы выбираете только df2.Это нормально в приведенном выше примере, но если ваш df2 имеет значение NULL, результирующий кадр данных будет иметь значение NULL.

Вы можете попробовать объединить его с помощью udf ниже:

def concat_cols(row):
    concat_row = ''.join([str(col) for col in row if col is not None])
    return concat_row 

udf_concat_cols = udf(concat_cols, StringType())

Функция concat_row может быть разбита на две части:

  1. "". Join ([mylist]) - это строковая функция .Он объединяет все в списке с определенным разделителем, в данном случае это пустая строка.
  2. [str (col) для col в строке, если col не None] - это понимание списка, оно делаеткак гласит: для каждого столбца в строке, если столбец не None, добавьте в список str (col).
    Понимание списка - это просто более питонский способ сделать это:

    mylist = [] 
    for col in row: 
        if col is not None:
            mylist.append(col))
    

Вы можете заменить свой код обновления следующим образом:

df_u = (df1
.join(df2, df1.NO == df2.NO, 'inner')
.filter(udf_concat_cols(struct(df1.NO, df1.NAME, df1.SAL)) != udf_concat_cols(struct(df2.NO, df2.NAME, df2.SAL)))
.select(coalesce(df1.NO, df2.NO), 
        coalesce(df1.NAME, df2.NAME),
        coalesce(df1.SAL, df2.SAL),
        F.lit('UPDATE').alias('FLAG')))

Вы должны сделать что-то подобное для своего флага #SAME и разбить строку для удобства чтения.


Обновление:

Если df2 всегда имеет правильный (обновленный) результат, объединять нет необходимости.Код для этого экземпляра будет:

df_u = (df1
.join(df2, df1.NO == df2.NO, 'inner')
.filter(udf_concat_cols(struct(df1.NO, df1.NAME, df1.SAL)) != udf_concat_cols(struct(df2.NO, df2.NAME, df2.SAL)))
.select(df2.NO,
        df2.NAME,
        df2.SAL,
        F.lit('UPDATE').alias('FLAG')))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...