Группировка и агрегирование дважды - PullRequest
1 голос
/ 04 августа 2020

У меня есть фрейм данных, который выглядит так:

City     State     Hour   Score     Percentage
DEN      CO        1      0         0
DEN      CO        1      0         0
DEN      CO        2      2         99
DEN      CO        3      0         0
NYC      NYC       1      0         0

Я хочу, чтобы он выглядел так:

City     State     total_hours  total_scores  total_perct.   total_volume      
DEN      CO        [1,2,3]      [0,2,0]       [0,99,0]       [2,1,1]
NYC      NYC       [1]          [0]           [0]            [1]

Для total_hours Я просто делаю collect_set для City и State для total_scores Я делаю collect_set для каждого указанного c часа, а затем собираю все баллы за все часы. Пример: Есть 2 оценки за час 1 Den CO, 0 и 0, я беру только одну из них, а затем за час 2 набирается 1, так что получается [0,2]. То же самое с total_perct. Для total_volume, я веду счет за каждый час, и я делаю collect_list за все часы одного и того же города и штата.

Это в основном то, что мне нужно достигать. Если я сделаю groupBy вот так:

df.groupBy("city", "state", "hour")
  .agg(collect_set("Hour").alias("total_hours"), collect_set("Score").alias("total_scores"), 
       collect_set("Percentage").alias("total_perct."), count("hour").alias("total_volume"))

, я получу следующий фрейм данных:

City     State     total_hours  total_scores  total_perct.   total_volume 
DEN      CO        [1]          [0]           [0]            2
DEN      CO        [2]          [2]           [99]           1
DEN      CO        [3]          [0]           [0]            1
NYC      NYC       [1]          [0]           [0]            1

Я не понимаю, что делать дальше. Как мне получить то, что есть сейчас, и добиться конечного результата? Я использую PySpark.

Ответы [ 2 ]

1 голос
/ 04 августа 2020

Другой способ - использовать окно для подсчета появления Hour, затем отфильтровать 1 индекс (idx) на основе раздела, а затем groupby + collect_list

import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy("City","State","Hour")
l = ['Hour','Score', 'Percentage', 'Volume']

(df.withColumn("idx",F.monotonically_increasing_id()).select("*",
            F.count("Hour").over(w).alias("Volume"),F.max("idx").over(w).alias("Indx"))
            .filter(F.col("idx")==F.col("Indx")).orderBy("idx").groupBy("City","State")
            .agg(*[F.collect_list(i).alias(f"total_{i}") for i in l])).show()

Вывод:

+----+-----+----------+-----------+----------------+------------+
|City|State|total_Hour|total_Score|total_Percentage|total_Volume|
+----+-----+----------+-----------+----------------+------------+
| NYC|  NYC|       [1]|        [0]|             [0]|         [1]|
| DEN|   CO| [1, 2, 3]|  [0, 2, 0]|      [0, 99, 0]|   [2, 1, 1]|
+----+-----+----------+-----------+----------------+------------+
1 голос
/ 04 августа 2020

Spark <2.4 </strong>

Необходимо использовать udf, но в этом случае он очень медленный. : (

import itertools
from pyspark.sql.functions import max, count, col, collect_list, collect_set, udf
from pyspark.sql.types import ArrayType, IntegerType

@udf
def flatten(col):
    return list(itertools.chain.from_iterable(col))

df.groupBy('City', 'State', 'Hour') \
  .agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
  .orderBy('City', 'State', 'Hour') \
  .groupBy('City', 'State') \
  .agg(collect_list(col('Hour')).alias('total_hours'), collect_list(col('Score')).alias('total_scores'), collect_list(col('Percentage')).alias('total_perct'), collect_list(col('total_volume')).alias('total_volume')) \
  .select('City', 'State', 'total_hours', flatten(col('total_scores')), flatten(col('total_perct')), 'total_volume') \
  .show(10, False)

Spark 2.4 +

Хорошо, это работает с collect_set и collect_list.

from pyspark.sql.functions import max, count, col, collect_list, flatten

df.groupBy('City', 'State', 'Hour') \
  .agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
  .orderBy('City', 'State', 'Hour') \
  .groupBy('City', 'State') \
  .agg(collect_list(col('Hour')).alias('total_hours'), flatten(collect_list(col('Score'))).alias('total_scores'), flatten(collect_list(col('Percentage'))).alias('total_perct.'), collect_list(col('total_volume')).alias('total_volume')) \
  .show(10, False)

+----+-----+-----------+------------+------------+------------+
|City|State|total_hours|total_scores|total_perct.|total_volume|
+----+-----+-----------+------------+------------+------------+
|NYC |NYC  |[1]        |[0]         |[0]         |[1]         |
|DEN |CO   |[1, 2, 3]  |[0, 2, 0]   |[0, 99, 0]  |[2, 1, 1]   |
+----+-----+-----------+------------+------------+------------+

Если вы не указали orderBy на этом шаге, тогда порядок в списке результатов будет смешанным.

...