Используя следующую логику, вы получите желаемый результат.
Функция
Window
используется для генерации номера строки для каждой группы id
и type
, упорядоченной по s_id
. Сгенерированный номер строки используется для filter
и concat
с type
. Затем, наконец, группировка и поворот должны дать вам желаемый результат
from pyspark.sql import Window
windowSpec = Window.partitionBy("id", "type").orderBy("s_id")
from pyspark.sql import functions as f
df.withColumn("ranks", f.row_number().over(windowSpec))\
.filter(f.col("ranks") < 4)\
.withColumn("type", f.concat(f.col("type"), f.col("ranks")))\
.drop("ranks")\
.groupBy("id")\
.pivot("type")\
.agg(f.first("s_id"))\
.show(truncate=False)
что должно дать вам
+---+--------+--------+--------+----+----+----+
|id |android1|android2|android3|ios1|ios2|ios3|
+---+--------+--------+--------+----+----+----+
|1 |15 |16 |17 |11 |12 |13 |
|2 |18 |null |null |21 |null|null|
+---+--------+--------+--------+----+----+----+
ответ за отредактированную часть
Вам просто нужен дополнительный фильтр как
df.withColumn("ranks", f.row_number().over(windowSpec)) \
.filter(f.col("ranks") < 4) \
.filter(f.col("type") != "") \
.withColumn("type", f.concat(f.col("type"), f.col("ranks"))) \
.drop("ranks") \
.groupBy("id") \
.pivot("type") \
.agg(f.first("s_id")) \
.show(truncate=False)
что даст вам
+---+--------+----+----+
|id |andriod1|ios1|ios2|
+---+--------+----+----+
|1 |15 |11 |12 |
|2 |18 |21 |null|
+---+--------+----+----+
Теперь в этом фрейме данных отсутствуют android2, android3 and ios3
столбцы. Потому что их нет в ваших обновленных входных данных. Вы можете добавить их, используя withColumn API и заполнить нулевыми значениями