Spark <2.4 </strong>
Необходимо использовать udf
, но в этом случае он очень медленный. : (
import itertools
from pyspark.sql.functions import max, count, col, collect_list, collect_set, udf
from pyspark.sql.types import ArrayType, IntegerType
@udf
def flatten(col):
return list(itertools.chain.from_iterable(col))
df.groupBy('City', 'State', 'Hour') \
.agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
.orderBy('City', 'State', 'Hour') \
.groupBy('City', 'State') \
.agg(collect_list(col('Hour')).alias('total_hours'), collect_list(col('Score')).alias('total_scores'), collect_list(col('Percentage')).alias('total_perct'), collect_list(col('total_volume')).alias('total_volume')) \
.select('City', 'State', 'total_hours', flatten(col('total_scores')), flatten(col('total_perct')), 'total_volume') \
.show(10, False)
Spark 2.4 +
Хорошо, это работает с collect_set
и collect_list
.
from pyspark.sql.functions import max, count, col, collect_list, flatten
df.groupBy('City', 'State', 'Hour') \
.agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
.orderBy('City', 'State', 'Hour') \
.groupBy('City', 'State') \
.agg(collect_list(col('Hour')).alias('total_hours'), flatten(collect_list(col('Score'))).alias('total_scores'), flatten(collect_list(col('Percentage'))).alias('total_perct.'), collect_list(col('total_volume')).alias('total_volume')) \
.show(10, False)
+----+-----+-----------+------------+------------+------------+
|City|State|total_hours|total_scores|total_perct.|total_volume|
+----+-----+-----------+------------+------------+------------+
|NYC |NYC |[1] |[0] |[0] |[1] |
|DEN |CO |[1, 2, 3] |[0, 2, 0] |[0, 99, 0] |[2, 1, 1] |
+----+-----+-----------+------------+------------+------------+
Если вы не указали orderBy
на этом шаге, тогда порядок в списке результатов будет смешанным.