Я хочу сгруппировать с помощью агрегации фрейм данных pyspark, при удалении дубликатов (сохранить последнее значение) на основе другого столбца этого фрейма данных .
В заключение я хотел бы применить dropDuplicates к объекту GroupedData. Таким образом, для каждой группы я мог бы динамически хранить только одну строку по какому-либо столбцу.
Пример
Прямое агрегирование группы для приведенного ниже кадра данных будет:
from pyspark.sql import functions
dataframe = spark.createDataFrame(
[
(1, "2020-01-01", 1, 1),
(2, "2020-01-01", 2, 1),
(3, "2020-01-02", 1, 1),
(2, "2020-01-02", 1, 1)
],
("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))
# +---+-------------------+-------+---+
# | id| ts|feature| h3|
# +---+-------------------+-------+---+
# | 1|2020-01-01 00:00:00| 1| 1|
# | 2|2020-01-01 00:00:00| 2| 1|
# | 3|2020-01-02 00:00:00| 1| 1|
# | 2|2020-01-02 00:00:00| 1| 1|
# +---+-------------------+-------+---+
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.sum("feature")
)
aggregated.show(truncate=False)
, приводящий к следующему фрейму данных:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
Проблема
Я хочу, чтобы агрегация использовала только самое последнее состояние каждого id
. В этом случае id=2
было обновлено до feature=1
в ts=2020-01-02 00:00:00
, поэтому все агрегаты с базовой отметкой времени, превышающей 2020-01-02 00:00:00
, должны использовать только это состояние для функции столбца, когда id=2
. Ожидаемый агрегированный фрейм данных:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
Как я могу сделать это с pyspark?
Обновление
Я предположил, что переменная MapType не должна иметь повторяющихся ключей в Spark. , Исходя из этого предположения, я подумал, что могу агрегировать столбец, создавая карту id -> feature
, а затем просто агрегировать значения карты с суммой (или какой бы ни была конечная агрегация).
Итак, я сделал:
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.map_from_entries(
functions.collect_list(
functions.struct("id","feature")
)
).alias("id_feature")
)
aggregated.show(truncate=False)
Но потом я обнаружил, что карты могут иметь дубликаты ключей:
+---+------------------------------------------+--------------------------------+
|h3 |window |id_feature |
+---+------------------------------------------+--------------------------------+
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2] |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1] |
+---+------------------------------------------+--------------------------------+
, поэтому это не решает мою проблему . Вместо этого я просто нашел другую проблему. При использовании функции отображения в записной книжке Databricks показывает столбец MapType без дублированных клавиш .