pyspark объединяет два Dataframe и сохраняет строки до последней даты - PullRequest
0 голосов
/ 06 марта 2019

У меня есть два кадра данных A и B.

A

+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|    5|2018-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

B

+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

и я должен создать новый Dataframe, в котором счет обновляется, просматривая дату

результат

+---+------+-----+----------+
|id |player|score|date      |
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

Ответы [ 2 ]

1 голос
/ 06 марта 2019

Вы можете объединить два кадра данных и использовать pyspark.sql.functions.when(), чтобы выбрать значения для столбцов score и date.

from pyspark.sql.functions import col, when

df_A.alias("a").join(df_B.alias("b"), on=["id", "player"], how="inner")\
    .select(
        "id", 
        "player", 
        when(
            col("b.date") > col("a.date"), 
            col("b.score")
        ).otherwise(col("a.score")).alias("score"),
        when(
            col("b.date") > col("a.date"), 
            col("b.date")
        ).otherwise(col("a.date")).alias("date")
    )\
    .show()
#+---+------+-----+----------+
#| id|player|score|      date|
#+---+------+-----+----------+
#|  1| alpha|  100|2019-02-13|
#|  2|  beta|    6|2018-02-13|
#+---+------+-----+----------+

Подробнее о when: Эквивалент искр IF, затем ELSE

0 голосов
/ 06 марта 2019

Я предполагаю, что каждому player присваивается id, и оно не меняется.OP хочет, чтобы результирующий кадр данных содержал score из наиболее актуальных date.

# Creating both the DataFrames.
df_A = sqlContext.createDataFrame([(1,'alpha',5,'2018-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_A = df_A.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))

df_B = sqlContext.createDataFrame([(1,'alpha',100,'2019-02-13'),(2,'beta',6,'2018-02-13')],('id','player','score','date'))
df_B = df_B.withColumn('date',to_date(col('date'), 'yyyy-MM-dd'))

Идея состоит в том, чтобы сделать union () из этих двух кадров данных изатем возьмите distinct строк.Причиной взятия distinct строк впоследствии является следующая: предположим, что для player обновления не было, тогда в B кадре данных его соответствующие значения будут такими же, как в кадре данных A.Итак, мы удалили такую ​​duplicates.

# Importing the requisite packages.
from pyspark.sql.functions import col, max
from pyspark.sql import Window
df = df_A.union(df_B).distinct()
df.show()
+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|    5|2018-02-13|
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+

Теперь, в качестве последнего шага, используйте функцию Window () , чтобы перебрать объединенный фрейм данных df и найти latestDate и отфильтровывать только те строки, где date совпадает с latestDate.Таким образом, все те строки, которые соответствуют этим players, будут удалены там, где произошло обновление (о чем свидетельствует обновленная дата в кадре данных B).

w = Window.partitionBy('id','player')
df = df.withColumn('latestDate', max('date').over(w))\
       .where(col('date') == col('latestDate')).drop('latestDate')
df.show()
+---+------+-----+----------+
| id|player|score|      date|
+---+------+-----+----------+
|  1| alpha|  100|2019-02-13|
|  2|  beta|    6|2018-02-13|
+---+------+-----+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...