Создайте отчет о несоответствии столбцов между двумя фреймами данных Pyspark. - PullRequest
0 голосов
/ 07 января 2020

Команда, у нас есть требование генерировать отчет о несовпадающих столбцах на основе ключевого поля между 2 кадрами данных Pyspark точно такой же структуры.

Вот первый кадр данных -

>>> df.show()
+--------+----+----+----+----+----+----+----+----+
|     key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
|    abcd| 123| xyz|   a|  ab| abc| def| qew| uvw|
|   abcd1| 123| xyz|   a|  ab| abc| def| qew| uvw|
|  abcd12| 123| xyz|   a|  ab| abc| def| qew| uvw|
| abcd123| 123| xyz|   a|  ab| abc| def| qew| uvw|
|abcd1234| 123| xyz|   a|  ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+

И вот 2-й кадр данных-

>>> df1.show()
+--------+----+----+----+----+----+----+----+----+
|     key|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+
|    abcd| 123| xyz|   a|  ab| abc| def| qew| uvw|
|   abcdx| 123| xyz|   a|  ab| abc| def| qew| uvw|
|  abcd12| 123| xyz|   a| abx| abc|defg| qew| uvw|
| abcd123| 123| xyz|   a|  ab| abc|defg| qew| uvw|
|abcd1234| 123| xyz|   a|  ab|abcd|defg| qew| uvw|
+--------+----+----+----+----+----+----+----+----+

Полное внешнее соединение дает мне это-

>>> dfFull=df.join(df1,'key','outer')
>>> dfFull.show()
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|     key|col1|col2|col3|col4|col5|col6|col7|col8|col1|col2|col3|col4|col5|col6|col7|col8|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  abcd12| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a| abx| abc|defg| qew| uvw|
|   abcd1| 123| xyz|   a|  ab| abc| def| qew| uvw|null|null|null|null|null|null|null|null|
|abcd1234| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a|  ab|abcd|defg| qew| uvw|
| abcd123| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a|  ab| abc|defg| qew| uvw|
|   abcdx|null|null|null|null|null|null|null|null| 123| xyz|   a|  ab| abc| def| qew| uvw|
|    abcd| 123| xyz|   a|  ab| abc| def| qew| uvw| 123| xyz|   a|  ab| abc| def| qew| uvw|
+--------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+

Если я просто посмотрю на col6, есть 5 значений, которые не совпадают для поля "ключ" ( единственное значение соответствует последней записи).

>>> dfFull.select('key',df['col6'],df1['col6']).show()
+--------+----+----+
|     key|col6|col6|
+--------+----+----+
|  abcd12| def|defg|
|   abcd1| def|null|
|abcd1234| def|defg|
| abcd123| def|defg|
|   abcdx|null| def|
|    abcd| def| def|
+--------+----+----+

Мне нужно создать отчет о чем-то подобном для всех столбцов. Примером несоответствия может быть любое значение записи из фреймов данных.

colName,NumofMismatch,mismatchSampleFromDf,misMatchSamplefromDf1
col6,5,def,defg
col7,2,null,qew
col8,2,null,uvw
col5,3,null,abc

Это сводка по столбцам, основанная на ключе, указывающая, сколько значений не совпадают между двумя фреймами данных.

Sid

1 Ответ

2 голосов
/ 08 января 2020

Допустим, что два кадра данных: df1 и df2 , вы можете попробовать следующее:

from pyspark.sql.functions import when, array, count, first

# list of columns to be compared
cols = df1.columns[1:]

df_new = (df1.join(df2, "key", "outer")
    .select([ when(~df1[c].eqNullSafe(df2[c]), array(df1[c], df2[c])).alias(c) for c in cols ])
    .selectExpr('stack({},{}) as (colName, mismatch)'.format(len(cols), ','.join('"{0}",`{0}`'.format(c) for c in cols)))
    .filter('mismatch is not NULL'))

df_new.show(10)
+-------+-----------+                                                           
|colName|   mismatch|
+-------+-----------+
|   col4|  [ab, abx]|
|   col6|[def, defg]|
|   col6|[def, defg]|
|   col5|[abc, abcd]|
|   col6|[def, defg]|
|   col1|    [, 123]|
|   col2|    [, xyz]|
|   col3|      [, a]|
|   col4|     [, ab]|
|   col5|    [, abc]|
+-------+-----------+

Примечания: (1) условие ~df1[c].eqNullSafe(df2[c]), используемое для нахождения несоответствий, удовлетворяет одному из следующих условий:

+ df1[c] != df2[c]
+ df1[c] is NULL or df2[c] is NULL but not both

(2) Несоответствия, если они существуют, сохраняются как столбец ArrayType с первым элементом из df1 и 2-й предмет из df2 . Значение NULL возвращается, если нет несоответствия, а затем отфильтровывается.

(3) Функция stack () , динамически генерируемая функциями формата Python, выглядит следующим образом:

stack(8,"col1",`col1`,"col2",`col2`,"col3",`col3`,"col4",`col4`,"col5",`col5`,"col6",`col6`,"col7",`col7`,"col8",`col8`) as (colName, mismatch)

После того, как у нас есть df_new, мы можем выполнить агрегацию по группам +:

df_new.groupby('colName') \
    .agg(count('mismatch').alias('NumOfMismatch'), first('mismatch').alias('mismatch')) \
    .selectExpr('colName', 'NumOfMismatch', 'mismatch[0] as misMatchFromdf1', 'mismatch[1] as misMatchFromdf2')
    .show()
+-------+-------------+---------------+---------------+
|colName|NumOfMismatch|misMatchFromdf1|misMatchFromdf2|
+-------+-------------+---------------+---------------+
|   col8|            2|           null|            uvw|
|   col3|            2|           null|              a|
|   col4|            3|             ab|            abx|
|   col1|            2|           null|            123|
|   col6|            5|            def|           defg|
|   col5|            3|            abc|           abcd|
|   col2|            2|           null|            xyz|
|   col7|            2|           null|            qew|
+-------+-------------+---------------+---------------+
...