Вот одно из возможных решений, в котором столбец Content
будет массивом StructType
с двумя именованными полями: Content
и count
.
from pyspark.sql.functions import col, collect_list, desc, lit, struct
from functools import reduce
def transform(df, n):
return reduce(
lambda a, b: a.unionAll(b),
(
df.groupBy(c).count()\
.orderBy(desc("count"), c)\
.limit(n)\
.withColumn("Column", lit(c))\
.groupBy("Column")\
.agg(
collect_list(
struct(
col(c).cast("string").alias("Content"),
"count")
).alias("Content")
)
for c in df.columns
)
)
Эта функция будет перебирать каждый из столбцов входного кадра данных df
и подсчитывать вхождение каждого значения. Затем мы orderBy
считаем число (по убыванию) и значение столбца самостоятельно (в алфавитном порядке) и сохраняем только первые n
строки (limit(n)
).
Затем соберите значения в массив структур и, наконец, union
соберите результаты для каждого столбца. Поскольку union
требует, чтобы каждый DataFrame имел одну и ту же схему, вам необходимо преобразовать значение столбца в строку.
n = 3
df1 = transform(df, n)
df1.show(truncate=False)
#+-------+------------------------------------+
#|Column |Content |
#+-------+------------------------------------+
#|Animals|[[Cat,1], [Dog,1], [Elephant,1]] |
#|Food |[[Banana,2], [Meat,2], [Fish,1]] |
#|Home |[[Jungle,2], [Desert,1], [Garden,1]]|
#+-------+------------------------------------+
Это не точно тот же вывод, который вы просили, но, вероятно, будет достаточно для ваших нужд. (У Spark нет кортежей, как вы описали.) Вот новая схема:
df1.printSchema()
#root
# |-- Column: string (nullable = false)
# |-- Content: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- Content: string (nullable = true)
# | | |-- count: long (nullable = false)