У меня есть 2 фрейма данных, которые я обрабатываю в pyspark из разных источников. Эти кадры данных имеют несколько общих столбцов. Вот что мне нужно сделать,
- сравнить 2 кадра данных на основе динамически генерируемых столбцов
- для совпавших строк, дальнейшая обработка на основе значений столбцов слева и справа
- также нужны несопоставленные записи с обоих фреймов данных
Вот что я имею в виду,
df1 = [Row(name=Bob, sub_id=1, id=1, age=5, status=active, version=0),
Row(name=Rob, sub_id=1, id=1, age=5, status=active, version=1),
Row(name=Tom, sub_id=2, id=3, age=50, status=active, version=0)]
df2 = [Row(name=Bobbie, sub_id=1, age=5),
Row(name=Tom, sub_id=2, age=51),
Row(name=Carter, sub_id=3, age=70)]
"""
my keys are based on sub_id and say they are per below,
sub_id = 1, keys = [sub_id]
sub_id = 2, keys = [sub_id, age]
sub_id = 3, keys = [sub_id]
"""
#matched records expected results
#note that only sub_id=1 has a match based on keys. After match and further processing, version 0 record in df1 was copied as version 2 and updated per df2
df_matched = [
Row(name=Bobbie, sub_id=1, id=1, age=5, status=active, version=0), #updated per df2
Row(name=Rob, sub_id=1, id=1, age=5, status=active, version=1),
Row(name=Bob, sub_id=1, id=1, age=5, status=active, version=2) #new insert
]
#unmatched from df1
df_unmatched_left = [Row(name=Rob, sub_id=1, id=1, age=5, status=active, version=1)]
#unmatched from df2
df_unmatched_right = [Row(name=Tom, sub_id=2, age=51),
Row(name=Carter, sub_id=3, age=70)]
#Here is what I have tried so far
#created temp views of df1 and df2, say df1_table, df2_table
#but how do i make the joins dynamic and also further process for which i need both df1 and df2
df_matched = spark.sql("sql to join df1_table and df2_table").select("select from both dfs") #use some function with map for further processing ?
#to make the join condition, would using map and adding a temp column with join condition to use in query a good approach ?
Я использую pyspark 2.3.x и python 3.5.x