Вы можете использовать функции окна.
partitionBy("group1", "group2")
дает вам группы со всеми комбинациями столбцов. orderBy("position")
упорядочивает строки внутри каждой из групп. row_number()
дает номера строк внутри каждой группы в соответствии с порядком.В зависимости от логики, может быть лучше использовать другие функции (например, rank()
, dense_rank()
)
После этого у вас есть столбец "row_num" с количеством позиций внутри каждогогруппы, и вы можете фильтровать свои данные.
from pyspark.sql.functions import col, lit, row_number
from pyspark.sql import Window
df = spark.createDataFrame([
("G1_A", "G2_P", "0001", 6, 0.3217543124839014),
("G1_A", "G2_P", "0002", 6, 0.4554057162820776),
("G1_A", "G2_P", "0003", 8, 0.3801357655062654),
("G1_A", "G2_P", "0004", 1, 0.8910865867971118),
("G1_A", "G2_P", "0005", 5, 0.04929044804190086),
("G1_A", "G2_Q", "0001", 7, 0.10188969920834146),
("G1_A", "G2_Q", "0002", 9, 0.4900202258755447),
("G1_A", "G2_Q", "0003", 9, 0.0570759385425319),
("G1_A", "G2_Q", "0004", 0, 0.8638132568329479),
("G1_A", "G2_Q", "0005", 8, 0.5631513545869068),
("G1_A", "G2_R", "0001", 2, 0.18320914601531957),
("G1_A", "G2_R", "0002", 0, 0.722470705002637),
("G1_A", "G2_R", "0003", 6, 0.27988540796939354),
("G1_A", "G2_R", "0004", 7, 0.13827103885498537),
("G1_A", "G2_R", "0005", 6, 0.8410548211059407),
("G1_A", "G2_S", "0001", 1, 0.6542679700270546),
("G1_A", "G2_S", "0002", 9, 0.8858848000834335),
("G1_A", "G2_S", "0003", 7, 0.5113964766224457),
("G1_A", "G2_S", "0004", 9, 0.7758283878692317),
("G1_A", "G2_S", "0005", 4, 0.011421066938733127),
("G1_B", "G2_P", "0001", 1, 0.6098855780360801),
("G1_B", "G2_P", "0002", 8, 0.009644056732163175),
("G1_B", "G2_P", "0003", 1, 0.9216012386238513),
("G1_B", "G2_P", "0004", 7, 0.8658947151731069),
("G1_B", "G2_P", "0005", 7, 0.8018548921412443),
("G1_B", "G2_Q", "0001", 7, 0.670090542740813),
("G1_B", "G2_Q", "0002", 6, 0.5051134978717621),
("G1_B", "G2_Q", "0003", 1, 0.16873516416387302),
("G1_B", "G2_Q", "0004", 8, 0.7750478028867812),
("G1_B", "G2_Q", "0005", 6, 0.9857364635291703),
("G1_B", "G2_R", "0001", 8, 0.8956034505498771),
("G1_B", "G2_R", "0002", 5, 0.9537748989951761),
("G1_B", "G2_R", "0003", 0, 0.14952641909752684),
("G1_B", "G2_R", "0004", 9, 0.3728857754552449),
("G1_B", "G2_R", "0005", 8, 0.55145790830298),
("G1_B", "G2_S", "0001", 9, 0.5261231425475038),
("G1_B", "G2_S", "0002", 6, 0.6789322931505193),
("G1_B", "G2_S", "0003", 2, 0.9682503963857059),
("G1_B", "G2_S", "0004", 4, 0.21506064374959122),
("G1_B", "G2_S", "0005", 4, 0.5521363246845827)],
["group1", "group2", "position", "value1", "value2"])
window_spec = Window \
.partitionBy("group1", "group2") \
.orderBy("position")
res = df \
.withColumn("row_num", row_number().over(window_spec)) \
.where(col("row_num") <= 3) \
.drop("row_num")
res.show()
Вывод:
+------+------+--------+------+--------------------+
|group1|group2|position|value1| value2|
+------+------+--------+------+--------------------+
| G1_A| G2_P| 0001| 6| 0.3217543124839014|
| G1_A| G2_P| 0002| 6| 0.4554057162820776|
| G1_A| G2_P| 0003| 8| 0.3801357655062654|
| G1_A| G2_Q| 0001| 7| 0.10188969920834146|
| G1_A| G2_Q| 0002| 9| 0.4900202258755447|
| G1_A| G2_Q| 0003| 9| 0.0570759385425319|
| G1_A| G2_R| 0001| 2| 0.18320914601531957|
| G1_A| G2_R| 0002| 0| 0.722470705002637|
| G1_A| G2_R| 0003| 6| 0.27988540796939354|
| G1_A| G2_S| 0001| 1| 0.6542679700270546|
| G1_A| G2_S| 0002| 9| 0.8858848000834335|
| G1_A| G2_S| 0003| 7| 0.5113964766224457|
| G1_B| G2_P| 0001| 1| 0.6098855780360801|
| G1_B| G2_P| 0002| 8|0.009644056732163175|
| G1_B| G2_P| 0003| 1| 0.9216012386238513|
| G1_B| G2_Q| 0001| 7| 0.670090542740813|
| G1_B| G2_Q| 0002| 6| 0.5051134978717621|
| G1_B| G2_Q| 0003| 1| 0.16873516416387302|
| G1_B| G2_R| 0001| 8| 0.8956034505498771|
| G1_B| G2_R| 0002| 5| 0.9537748989951761|
+------+------+--------+------+--------------------+