Ниже приведена функция для того же, что я написал в pyspark, дайте мне знать, если это не сработало для вас:
from pyspark.sql import Window
import math
import pyspark.sql.types as T
import pyspark.sql.functions as F
def calc_percentile(perc_df, part_col, order_col, p_val=[33,66], num_bins=100, max_bins = 100, perc_col="p_band"):
"""
Calculate percentile with nimber of bins on specified columns
"""
win = Window.partitionBy(*part_col).orderBy(order_col)
def perc_func(col, num, max_bins):
step = max_bins / num
return {(p_tile / step): int(
math.ceil(col * (p_tile / float(max_bins)))
) for p_tile in range(step, max_bins + step, step)}
perc_udf = F.udf(perc_func, T.MapType(T.IntegerType(), T.IntegerType()))
# perc_df.show()
rank_data = perc_df.filter(
F.col(order_col).isNotNull()
).withColumn(
"rank", F.dense_rank().over(win)
)
rank_data.persist()
rank_data.count()
overall_count_data = rank_data.groupBy(
*part_col
).agg(
F.max(
F.col("rank")
).alias("count")
).select(
F.explode(
perc_udf(F.col("count"), F.lit(num_bins), F.lit(max_bins))
).alias("n_tile", "rank"), "count",
*part_col
)
overall_count_data.persist()
overall_count_data.count()
return overall_count_data.join(
rank_data, part_col + ["rank"]
).withColumn(
perc_col,
F.concat(F.lit("P_"), F.col("n_tile").cast("string"))
).groupBy(
*part_col
).pivot(
perc_col, ["P_{0}".format(p_val1) for p_val1 in p_val]
).agg(
F.max(order_col)
).select(
*(
part_col + [F.col("P_{0}".format(p_val1)) for p_val1 in p_val]
)
)