Подсчет экземпляров значений в списках в одном столбце - PullRequest
0 голосов
/ 08 мая 2020

Я новичок в PySpark и изо всех сил пытаюсь выполнить то, что, по моему мнению, должно быть простой задачей ...

У меня есть фрейм данных PySpark, где 1 столбец состоит из списков строк. Я хотел бы подсчитать количество экземпляров каждого элемента в каждом списке строк во всех строках. Псевдокод будет выглядеть следующим образом:

counter = Counter()
for attr_list in df['attr_list']:
   counter.update(attr_list)

Другой способ сделать это - объединить все списки по всем строкам и построить счетчик из одного огромного списка.

Is есть ли эффективный способ сделать это в PySpark?

Правильный вывод будет состоять из одного объекта collections.Counter (), заполненного количеством вхождений каждого элемента во всех списках по всем столбцам, то есть если для для данного столбца в строке 1 есть список ['a', 'b', 'c'], а в строке 2 есть список ['b', 'c', 'd'], мы получим счетчик который выглядит как {'a': 1, 'b': 2, 'c': 2, 'd': 1}.

Thx!

Ответы [ 3 ]

2 голосов
/ 09 мая 2020

Если вы знаете elements, которое вам нужно посчитать, вы можете использовать его с spark2.4+., и это будет очень быстро. (Используя higher order function filter и structs)

df.show()

#+------------+
#|    atr_list|
#+------------+
#|[a, b, b, c]|
#|   [b, c, d]|
#+------------+

elements=['a','b','c','d']

from pyspark.sql import functions as F
collected=df.withColumn("struct", F.struct(*[(F.struct(F.expr("size(filter(atr_list,x->x={}))"\
                                                    .format("'"+y+"'"))).alias(y)) for y in elements]))\
            .select(*[F.sum(F.col("struct.{}.col1".format(x))).alias(x) for x in elements])\
            .collect()[0]

{elements[i]: [x for x in collected][i] for i in range(len(elements))} 

Out: {'a': 1, 'b': 3, 'c': 2, 'd': 1}

2-й метод с использованием transform, aggregate, explode and groupby (не требуется указать элементы):

from pyspark.sql import functions as F

a=df.withColumn("atr", F.expr("""transform(array_distinct(atr_list),x->aggregate(atr_list,0,(acc,y)->\
                               IF(y=x, acc+1,acc)))"""))\
  .withColumn("zip", F.explode(F.arrays_zip(F.array_distinct("atr_list"),("atr"))))\
  .select("zip.*").withColumnRenamed("0","elements")\
  .groupBy("elements").agg(F.sum("atr").alias("sum"))\
  .collect()

{a[i][0]: a[i][1] for i in range(len(a))} 
0 голосов
/ 08 мая 2020

Один из вариантов преобразования в RDD - объединить все массивы в один, а затем использовать на нем объект Counter.

from collections import Counter
all_lists = df.select('listCol').rdd
print(Counter(all_lists.map(lambda x: [i for i in x[0]]).reduce(lambda x,y: x+y)))

Другой вариант с explode и groupBy и объединение результата с dictionary.

from pyspark.sql.functions import explode
explode_df = df.withColumn('exploded_list',explode(df.listCol))
counts = explode_df.groupBy('exploded_list').count()
counts_tuple = counts.rdd.reduce(lambda a,b : a+b)
print({counts_tuple[i]:counts_tuple[i+1] for i in range(0,len(counts_tuple)-1,2)})
0 голосов
/ 08 мая 2020

Вы можете попробовать использовать методы distinct и flatMap для rdd, для этого просто преобразуйте столбец в rdd и выполните эти операции.

counter = (df
           .select("attr_list")
           .rdd
           # join all strings in the list and then split to get each word
           .map(lambda x: " ".join(x).split(" ")) 
           .flatMap(lambda x: x)
           # make a tuple for each word so later it can be grouped by to get its frequency count
           .map(lambda x: (x, 1))
           .reduceByKey(lambda a,b: a+b)
           .collectAsMap())
...