Я имел в виду что-то вроде этого:
>>> import pandas as pd
>>> from pyspark.sql.functions import *
>>> from pyspark.sql.window import *
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName('abc').getOrCreate()
>>> data = {"user_id": ["ABC123", "ABC123", "qwerty", "ABC123"], "address": ["yyy,USA", "xxx,USA", "55A,AUS", "zzz,RSA"], "type": ["animal", "animal", "human", "animal"], "count": [2,3,3,4], "country": ["USA", "USA", "AUS", "RSA"]}
>>> df = pd.DataFrame(data=data)
>>> df_pyspark = spark.createDataFrame(df)
>>> w = Window().partitionBy("user_id", "country").orderBy((col("count").desc()))
>>> w2 = Window().partitionBy("user_id").orderBy(col("sum_country").desc())
>>> df_pyspark.select("user_id", "address", "type", "count", "country", sum("count").over(w).alias("sum_country")).select("user_id", first("country").over(w2).alias("top_country"), first("address").over(w).alias("top_address"), "country").where(col("top_country")==col("country")).distinct().show()
+-------+-----------+-----------+-------+
|user_id|top_country|top_address|country|
+-------+-----------+-----------+-------+
| qwerty| AUS| 55A,AUS| AUS|
| ABC123| USA| xxx,USA| USA|
+-------+-----------+-----------+-------+
Вы можете добавить тип, число и т. Д. В зависимости от того, какую логику вы хотите использовать для этого - вы можете сделать то же самое, что и для top_address
(т.е. first
функция), или вы можете groupBy
и agg