Каков наилучший способ получить каждую группу в качестве нового кадра данных и передать другую функцию в l oop? - PullRequest
0 голосов
/ 12 марта 2020

Я использую spark- sql -2.4.1v и пытаюсь найти квантили, то есть процентиль 0, процентиль 25 и т. Д. c, в каждом столбце моих данных.

Мои данные:

+----+---------+-------------+----------+-----------+--------+
|  id|     date|      revenue|con_dist_1| con_dist_2| state  |
+----+---------+-------------+----------+-----------+--------+
|  10|1/15/2018|  0.010680705|         6|0.019875458|   TX   |
|  10|1/15/2018|  0.006628853|         4|0.816039063|   AZ   |
|  10|1/15/2018|   0.01378215|         4|0.082049528|   TX   |
|  10|1/15/2018|  0.010680705|         6|0.019875458|   TX   |
|  10|1/15/2018|  0.006628853|         4|0.816039063|   AZ   |
|  10|1/15/2018|   0.01378215|         4|0.082049528|   CA   |
|  10|1/15/2018|  0.010680705|         6|0.019875458|   CA   |
|  10|1/15/2018|  0.006628853|         4|0.816039063|   CA   |
+----+---------+-------------+----------+-----------+--------+

Я бы получил состояния для расчета, т. Е.

val states = Seq("CA","AZ");
val cols = Seq("con_dist_1" ,"con_dist_2")

для каждого данного состояния. Мне нужно получить данные из исходной таблицы и рассчитать процентили только для данные столбцы.

Я пытаюсь, как показано ниже

for( state <- states){

     for( col <- cols){
        // pecentile calculation
     }
}

это слишком медленно, когда группировка по "состоянию" не будет получать другие столбцы, такие как доход, дата и идентификатор .. как получить их?

Как найти квантили по столбцам "con_dist_1" и "con_dist_2" для каждого состояния? Итак, каков наилучший способ, который хорошо масштабируется на кластере?

Каков наилучший способ справиться с этим вариантом использования?

Ожидаемый результат

+-----+---------------+---------------+---------------+---------------+---------------+---------------+
|state|col1_quantile_1|col1_quantile_2|col1_quantile_3|col2_quantile_1|col2_quantile_2|col2_quantile_3|
+-----+---------------+---------------+---------------+---------------+---------------+---------------+
|   AZ|              4|              4|              4|    0.816039063|    0.816039063|    0.816039063|
|   TX|              4|              6|              6|    0.019875458|    0.019875458|    0.082049528|
+-----+---------------+---------------+---------------+---------------+---------------+---------------+

Ответы [ 2 ]

1 голос
/ 12 марта 2020

Возможно, вам придется сделать что-то похожее на приведенный ниже фрагмент кода.

df.groupBy(col("state"))
    .agg(collect_list(col("con_dist_1")).as("col1_quant"), collect_list(col("con_dist_2")).as("col2_quant"))
    .withColumn("col1_quant1", col("col1_quant")(0))
    .withColumn("col1_quant2", col("col1_quant")(1))
    .withColumn("col2_quant1", col("col2_quant")(0))
    .withColumn("col2_quant2", col("col2_quant")(1))
    .show

OutPut:
+-----+----------+--------------------+-----------+-----------+-----------+-----------+
|state|col1_quant|          col2_quant|col1_quant1|col1_quant2|col2_quant1|col2_quant2|
+-----+----------+--------------------+-----------+-----------+-----------+-----------+
|   AZ|    [4, 4]|[0.816039063, 0.8...|          4|          4|0.816039063|0.816039063|
|   CA|    [4, 6]|[0.082049528, 0.0...|          4|          6|0.082049528|0.019875458|
|   TX| [6, 4, 6]|[0.019875458, 0.0...|          6|          4|0.019875458|0.082049528|
+-----+----------+--------------------+-----------+-----------+-----------+-----------+

может быть последним набором withColumn должен быть внутри al oop в зависимости от количества записей для каждого состояния.

Надеюсь, это поможет!

1 голос
/ 12 марта 2020

ОБНОВЛЕНИЕ

Я обнаружил функцию percentile_approx в контексте улья, поэтому вам не нужно использовать функции stat.

val states = Seq("CA", "AZ")
val cols = Seq("con_dist_1", "con_dist_2")

val l = cols.map(c => expr(s"percentile_approx($c, Array(0.25, 0.5, 0.75)) as ${c}_quantiles"))
val df2 = df.filter($"state".isin(states: _*)).groupBy("state").agg(l.head, l.tail: _*)

df2.select(col("state") +: cols.flatMap( c => (1 until 4).map( i => col(c + "_quantiles")(i - 1).alias(c + "_quantile_" + i))): _*).show(false)

Здесь я попробовал автоматизированный метод для данных states и cols. Результат будет:

+-----+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|state|con_dist_1_quantile_1|con_dist_1_quantile_2|con_dist_1_quantile_3|con_dist_2_quantile_1|con_dist_2_quantile_2|con_dist_2_quantile_3|
+-----+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+
|AZ   |4                    |4                    |4                    |0.816039063          |0.816039063          |0.816039063          |
|CA   |4                    |4                    |6                    |0.019875458          |0.082049528          |0.816039063          |
+-----+---------------------+---------------------+---------------------+---------------------+---------------------+---------------------+

Имейте в виду, что результат немного отличается от ожидаемого, потому что я установил states = Seq("CA", "AZ"), который вы дали.


ORIGINAL

Используйте Window для состояний и рассчитайте percent_rank для каждого столбца.

import org.apache.spark.sql.expressions.Window

val w1 = Window.partitionBy("state").orderBy("con_dist_1")
val w2 = Window.partitionBy("state").orderBy("con_dist_2")
df.withColumn("p1", percent_rank.over(w1))
  .withColumn("p2", percent_rank.over(w2))
  .show(false)

Вы можете сначала отфильтровать кадр данных, только для указанных c состояния. В любом случае, результат:

+---+---------+-----------+----------+-----------+-----+---+---+
|id |date     |revenue    |con_dist_1|con_dist_2 |state|p1 |p2 |
+---+---------+-----------+----------+-----------+-----+---+---+
|10 |1/15/2018|0.006628853|4         |0.816039063|AZ   |0.0|0.0|
|10 |1/15/2018|0.006628853|4         |0.816039063|AZ   |0.0|0.0|
|10 |1/15/2018|0.010680705|6         |0.019875458|CA   |1.0|0.0|
|10 |1/15/2018|0.01378215 |4         |0.082049528|CA   |0.0|0.5|
|10 |1/15/2018|0.006628853|4         |0.816039063|CA   |0.0|1.0|
|10 |1/15/2018|0.010680705|6         |0.019875458|TX   |0.5|0.0|
|10 |1/15/2018|0.010680705|6         |0.019875458|TX   |0.5|0.0|
|10 |1/15/2018|0.01378215 |4         |0.082049528|TX   |0.0|1.0|
+---+---------+-----------+----------+-----------+-----+---+---+
...