Как сгруппировать по распределению в спарк? - PullRequest
0 голосов
/ 07 ноября 2018

Я ищу способ группировки по столбцу (значения int) на основе распределений. Например ,::1001*

Df =

col1  col2
1a      10
2a      120 
3a      3 
4a      10000 
5a      10 

Я пытаюсь получить дистрибутивы вроде (0, 1, 5, 10, 50, 100, 10000):

distribution(lesser than or equal)           count
0                                               0
1                                               0
5                                               1
10                                              2
50                                              3
100                                             3
10000                                           5

Есть ли простой способ сделать это с помощью любых заранее определенных математических функций в Spark?

1 Ответ

0 голосов
/ 07 ноября 2018

Итак, во-первых, мы могли бы вычислить процентили целевого столбца. Например, скажем, мы вычисляем 1, 5, 10, 25, 50, 75, 90, 95, 99, 100 квантилей.

val df = Seq(1, 1, 2, 3, 5, 5, 5, 9, 9, 10, 12, 13, 15, 15, 16, 20, 200, 201, 205).toDF
val quantiles = List(0.01, 0.05, 0.10, 0.25, 0.50, 0.75, 0.90, 0.95, 0.99)
val quantsGenerator = for {
    a <- quantiles
  } yield callUDF("percentile_approx", col("value"), lit(a)).as("q" + a.toString)
val quantilesComputed = df.agg(count("*").as("count"), quantsGenerator: _*).drop("count")

Итак, в этом первом фрагменте я инициализирую свой список DF и квантилей. Для каждого квантиля я создаю столбец, который будет вычислением процентиля в столбце. Моя переменная quantsGenerator содержит выражение каждого столбца, который я буду применять в функции agg.

Функция agg кодируется в исходном коде Spark так, как вам нужно, чтобы сначала иметь один столбец (поэтому здесь я использую count ("*") ), а затем вы можете задать переменные к функции agg , которая здесь будет нашим quantsGenerator. Счет ("*") бесполезен, просто для того, чтобы мы могли дать переменные функции agg . Так что вы можете бросить его позже.

Как только вы получите это, мы можем сгенерировать окончательный фрейм данных, который будет "La fonction de répartition", как мы бы назвали его по-французски: D

val quantsAsArrayDouble = quantilesComputed.collect.map(x => x.toSeq.asInstanceOf[Seq[Double]).flatten
val whenFunctions = quantsAsArrayDouble.map(x => sum(when(col("value") <= x.toDouble, 1)).as("<=" + x.toString))
val finalDf = df.agg(count(lit(1)).as("count"), whenFunctions: _*).drop("count")

Опять этот счет ("*") ... О нет! считать (горит (1)), это точно так же. Он интерпретируется как count ("*") и в нашем случае полезен для изменения функции whenFunctions ... grrrr

И, в конце концов, вы получаете то, что вам может понадобиться.

scala> df.agg(count("*"),c:_*).show
+-----+-----+-----+-----+------+------+-------+-------+-------+-------+
|<=1.0|<=1.0|<=1.0|<=5.0|<=10.0|<=16.0|<=201.0|<=205.0|<=205.0|<=205.0|
+-----+-----+-----+-----+------+------+-------+-------+-------+-------+
|    2|    2|    2|    7|    10|    15|     18|     19|     19|     19|
+-----+-----+-----+-----+------+------+-------+-------+-------+-------+

Несколько одинаковых значений повторяются, потому что у меня недостаточно данных. Но я уверен, что в вашем случае это будет хорошо работать с большим количеством данных. Вы можете выбрать процентили, которые вы хотите, изменив первый список.

Удачи

PS: Я только что увидел, как в предыдущем посте было опубликовано, что в Spark существует объект Bucketizer. Даааа, это может быть более эффективным, чем моя техника х) Мой плохой! Веселитесь вместе с Spark

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...