Я предполагаю, что каждому player
присваивается id
, и оно не меняется.OP хочет, чтобы результирующий кадр данных содержал score
из наиболее актуальных date
.
# Creating both the DataFrames.
df_A = sqlContext.createDataFrame([(1,'alpha',5,'2018-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_A = df_A.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
df_B = sqlContext.createDataFrame([(1,'alpha',100,'2019-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_B = df_B.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))
Идея состоит в том, чтобы сделать union () из этих двух кадров данных изатем возьмите distinct
строк.Причиной взятия distinct
строк впоследствии является следующая: предположим, что для player
обновления не было, тогда в B
кадре данных его соответствующие значения будут такими же, как в кадре данных A
.Итак, мы удалили такую duplicates
.
# Importing the requisite packages.
from pyspark.sql.functions import col, max
from pyspark.sql import Window
df = df_A.union(df_B).distinct()
df.show()
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 5|2018-02-13|
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+
Теперь, в качестве последнего шага, используйте функцию Window () , чтобы перебрать объединенный фрейм данных df
и найти latestDate
и отфильтровывать только те строки, где date
совпадает с latestDate
.Таким образом, все те строки, которые соответствуют этим players
, будут удалены там, где произошло обновление (о чем свидетельствует обновленная дата в кадре данных B
).
w = Window.partitionBy('id','player')
df = df.withColumn('latestDate', max('date').over(w))\
.where(col('date') == col('latestDate')).drop('latestDate')
df.show()
+---+------+-----+----------+
| id|player|score| date|
+---+------+-----+----------+
| 1| alpha| 100|2019-02-13|
| 2| beta| 6|2018-02-13|
+---+------+-----+----------+