Определяемая пользователем функция агрегирования в Spark для реализации процентили - PullRequest
0 голосов
/ 02 сентября 2018

Я пытаюсь написать udaf для вычисления значений percentile.

Мне нужно написать пользовательскую функцию, потому что существующие функции искры percentile_approx, approx_percentile и percentile используют округление не так, как мне нужно.

Мне нужно использовать пол вместо округления средней точки. Могу ли я в любом случае написать это в pyspark?

Если нет, как этого добиться в scala?

Мне нужно вычислить percentile, используя метод ниже:

def percentile_custom(lst, per):
    lst.sorted()
    rank = (len(lst)+1)*per
    ir = math.floor(rank)
    ir1 = math.ceil(rank)
    if (ir == ir1):
        return lst[ir-1]

    else: 
        fr = rank - ir
        ir_qh = lst[ir-1]
        ir_qh1 = lst[ir]
        inter = ((ir_qh1 - ir_qh)*fr) + ir_qh
        return math.floor(inter) 

1 Ответ

0 голосов
/ 03 сентября 2018

Ниже приведена функция для того же, что я написал в pyspark, дайте мне знать, если это не сработало для вас:

from pyspark.sql import Window
import math
import pyspark.sql.types as T
import pyspark.sql.functions as F

def calc_percentile(perc_df, part_col, order_col, p_val=[33,66], num_bins=100, max_bins = 100, perc_col="p_band"):
    """
        Calculate percentile with nimber of bins on specified columns
    """
    win = Window.partitionBy(*part_col).orderBy(order_col)
    def perc_func(col, num, max_bins):
        step = max_bins / num
        return {(p_tile / step): int(
            math.ceil(col * (p_tile / float(max_bins)))
        )  for p_tile in range(step, max_bins + step, step)}
    perc_udf = F.udf(perc_func, T.MapType(T.IntegerType(), T.IntegerType()))
#     perc_df.show()
    rank_data = perc_df.filter(
        F.col(order_col).isNotNull()
    ).withColumn(
        "rank", F.dense_rank().over(win)
    )

    rank_data.persist()
    rank_data.count()

    overall_count_data = rank_data.groupBy(
        *part_col
    ).agg(
        F.max(
            F.col("rank")
        ).alias("count")
    ).select(
        F.explode(
            perc_udf(F.col("count"), F.lit(num_bins), F.lit(max_bins))
        ).alias("n_tile", "rank"), "count",
        *part_col
    )
    overall_count_data.persist()
    overall_count_data.count()
    return overall_count_data.join(
        rank_data, part_col + ["rank"]
    ).withColumn(
        perc_col,
        F.concat(F.lit("P_"), F.col("n_tile").cast("string"))
    ).groupBy(
        *part_col
    ).pivot(
        perc_col, ["P_{0}".format(p_val1) for p_val1 in p_val]
    ).agg(
        F.max(order_col)
    ).select(
        *(
            part_col + [F.col("P_{0}".format(p_val1)) for p_val1 in p_val]
        )
    )
...