Объединять и заменять элементы двух информационных фреймов с помощью PySpark - PullRequest
0 голосов
/ 17 апреля 2020

У меня есть два фрейма данных:

DF1:

╔═══════╦═════╦═════════╦════╗
║ Name  ║ Age ║ Address ║ Id ║
╠═══════╬═════╬═════════╬════╣
║ test1 ║ 20  ║  ls     ║ 10 ║
╠═══════╬═════╬═════════╬════╣
║ test2 ║     ║ baz     ║ 15 ║
╠═══════╬═════╬═════════╬════╣
║ test3 ║     ║ az      ║ 19 ║
╚═══════╩═════╩═════════╩════╝

DF2:

╔═══════╦═════╦═════════╦════╗
║ Name  ║ Age ║ Address ║ Id ║
╠═══════╬═════╬═════════╬════╣
║ test4 ║ 20  ║ bas     ║ 10 ║
╠═══════╬═════╬═════════╬════╣
║ test5 ║     ║ baz     ║ 25 ║
╠═══════╬═════╬═════════╬════╣
║ test6 ║ 40  ║ az      ║ 19 ║
╚═══════╩═════╩═════════╩════╝

Результат:

╔═══════╦═════╦═════════╦════╗
║ Name  ║ Age ║ Address ║ Id ║
╠═══════╬═════╬═════════╬════╣
║ test1 ║ 20  ║ ls      ║ 10 ║
╠═══════╬═════╬═════════╬════╣
║ test2 ║ 40  ║  az     ║ 19 ║
╚═══════╩═════╩═════════╩════╝

Что Я хочу добиться: 1. Когда идентификатор совпадает в обоих кадрах, он должен учитывать только эту запись в выводе. 2. Все столбцы этой соответствующей записи должны быть заменены столбцами DF1. 3. Если столбец DF1 пуст и данные присутствуют в DF2, он не должен заменять его.

Попытки соединения также:

DF3 = DF1.join(DF2, [DF1.Id == DF2.Id], 'inner')
DF3.show()

Результат:

Name,Age,Adress,Id,Name,Age,Adress,Id
test1,20,ls,10,test5,20,bas,10

Если я использую

DF3 = DF1.join(DF2, [DF1.Id == DF2.Id], 'leftsemi')
DF3.show()

Это дает мне данные из DF1 и не добавляет пропущенные значения из D2.

Попытка достичь ниже:

for i in df2.columns:
    df2 = df2.withColumn(i, when(df1.Id == col("Id") & (col(i) == ""), df1(i)).otherwise(col(i)))
df2.show()

1 Ответ

0 голосов
/ 17 апреля 2020

Outer Join будет хранить записи из обеих таблиц вместе со связанными нулевыми значениями в соответствующих левой / правой таблицах. Данные слева и справа можно сравнивать (проверьте, имеет ли значение null) и заменить их ненулевыми значениями.

from pyspark.sql import SparkSession
from pyspark.sql.functions import when

spark = SparkSession.builder.getOrCreate()

ds1 = [
    {'Name': 'test1', 'Age': 20, 'Address': 'ls', 'Id': 10},
    {'Name': 'test2', 'Age': None, 'Address': 'baz', 'Id': 15},
    {'Name': 'test3', 'Age': None, 'Address': 'az', 'Id': 19},
]

ds2 = [
    {'Name': 'test4', 'Age': 20, 'Address': 'az', 'Id': 10},
    {'Name': 'test5', 'Age': None, 'Address': 'az', 'Id': 25},
    {'Name': 'test6', 'Age': 40, 'Address': 'az', 'Id': 19},
]

df1 = spark.createDataFrame(ds1)
df2 = spark.createDataFrame(ds2)

df1.show()

+-------+----+---+-----+
|Address| Age| Id| Name|
+-------+----+---+-----+
|     ls|  20| 10|test1|
|    baz|null| 15|test2|
|     az|null| 19|test3|
+-------+----+---+-----+

df2.show()

+-------+----+---+-----+
|Address| Age| Id| Name|
+-------+----+---+-----+
|     az|  20| 10|test4|
|     az|null| 25|test5|
|     az|  40| 19|test6|
+-------+----+---+-----+

join_by_col = 'Id'

df_ = df1.join(df2, on=[join_by_col], how='outer').orderBy(join_by_col)

df_.show()

+---+-------+----+-----+-------+----+-----+
| Id|Address| Age| Name|Address| Age| Name|
+---+-------+----+-----+-------+----+-----+
| 10|     ls|  20|test1|     az|  20|test4|
| 15|    baz|null|test2|   null|null| null|
| 19|     az|null|test3|     az|  40|test6|
| 25|   null|null| null|     az|null|test5|
+---+-------+----+-----+-------+----+-----+

for col in df1.columns:
    if col != join_by_col:
        col_ = col + '_'
        df_ = df_.withColumn(
            col_, when(df1[col].isNull(), df2[col]).otherwise(df1[col])).drop(col)

df_.show()

+---+--------+----+-----+
| Id|Address_|Age_|Name_|
+---+--------+----+-----+
| 10|      ls|  20|test1|
| 15|     baz|null|test2|
| 19|      az|  40|test3|
| 25|      az|null|test5|
+---+--------+----+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...