Как агрегировать значения в массиве в pyspark? - PullRequest
2 голосов
/ 17 июня 2020

Spark версии 3.0

У меня есть такой фрейм данных

+-------------------------------------------------+
|named_val                                        |
+-------------------------------------------------+
|[[Alex, 1], [is, 1], [a, 1], [good, 1], [boy, 1]]|
|[[Bob, 1], [Bob, 1], [bad, 1], [Bob, 1]]         |
+-------------------------------------------------+

Мне нужно создать карту с подсчетом уникальных значений, как показано ниже

Ожидаемый результат

+-------------------------------------------------+
|named_val                                        |
+-------------------------------------------------+
|{Alex->1, is->1, a->1, good->1, boy->1}          |
|{Bob->3, bad->1}                                 |
+-------------------------------------------------+

Для воспроизведения кода используйте

df = spark.createDataFrame([([['Alex', 1], ['is', 1], ['a', 1], ['good', 1], ['boy', 1]],),([['Bob', 1], ['Bob', 1], ['bad', 1], ['Bob', 1]],)],['named_val'])

Ответы [ 2 ]

2 голосов
/ 17 июня 2020

А как насчет нашего старого друга UDF? Стоимость se / de должна быть низкой по сравнению с перетасовкой:

from pyspark.sql.functions import udf

def sum_merge(ar):
  d = dict()
  for i in ar:
    k, v = i[0], int(i[1])    
    d[k] = d[k] + v if k in d else v
  return d

sum_merge_udf = udf(sum_merge)

df.select(sum_merge_udf("named_val").alias("named_val"))

# +----------------------------------+
# |named_val                         |
# +----------------------------------+
# |{a=1, Alex=1, is=1, boy=1, good=1}|
# |{bad=1, Bob=3}                    |
# +----------------------------------+
2 голосов
/ 17 июня 2020

В scala, но python версия будет очень похожа:

val df =  Seq(Seq(("Alex",1),("is",1),("a",1),("good",1),("boy",1)),Seq(("Bob",1),("Bob",1),("bad",1),("Bob",1))).toDF()
df.show(false)
+-------------------------------------------------+
|value                                            |
+-------------------------------------------------+
|[[Alex, 1], [is, 1], [a, 1], [good, 1], [boy, 1]]|
|[[Bob, 1], [Bob, 1], [bad, 1], [Bob, 1]]         |
+-------------------------------------------------+


df.withColumn("id",monotonicallyIncreasingId)
.select('id,explode('value))
.select('id,'col.getField("_1").as("val"))
.groupBy('id,'val).agg(count('val).as("ct"))
.select('id,map('val,'ct).as("map"))
.groupBy('id).agg(collect_list('map))
.show(false)

+---+-----------------------------------------------------------+
|id |collect_list(map)                                          |
+---+-----------------------------------------------------------+
|0  |[[is -> 1], [Alex -> 1], [boy -> 1], [a -> 1], [good -> 1]]|
|1  |[[bad -> 1], [Bob -> 3]]                                   |
+---+-----------------------------------------------------------+
...