Фрейм данных Pyspark: количество элементов в массиве или списке - PullRequest
0 голосов
/ 28 сентября 2018

Давайте предположим, что датафрейм df:

df.show()

Вывод:

+------+----------------+
|letter| list_of_numbers|
+------+----------------+
|     A|    [3, 1, 2, 3]|
|     B|    [1, 2, 1, 1]|
+------+----------------+

Что я хочу сделать, это count количество определенного элемента в столбце list_of_numbers.Примерно так:

+------+----------------+----+
|letter| list_of_numbers|ones|
+------+----------------+----+
|     A|    [3, 1, 2, 3]|   1|
|     B|    [1, 2, 1, 1]|   3|
+------+----------------+----+

До сих пор я пытался создать udf, и это прекрасно работает, но мне интересно, могу ли я сделать это без определения udf.

Ответы [ 2 ]

0 голосов
/ 28 сентября 2018

Вы можете разбить массив и отфильтровать взорванные значения для 1.Затем groupBy и count:

from pyspark.sql.functions import col, count, explode

df.select("*", explode("list_of_numbers").alias("exploded"))\
    .where(col("exploded") == 1)\
    .groupBy("letter", "list_of_numbers")\
    .agg(count("exploded").alias("ones"))\
    .show()
#+------+---------------+----+
#|letter|list_of_numbers|ones|
#+------+---------------+----+
#|     A|   [3, 1, 2, 3]|   1|
#|     B|   [1, 2, 1, 1]|   3|
#+------+---------------+----+

Чтобы сохранить все строки, даже когда счетчик равен 0, вы можете преобразовать столбец exploded в переменную индикатора.Затем groupBy и sum.

from pyspark.sql.functions import col, count, explode, sum as sum_

df.select("*", explode("list_of_numbers").alias("exploded"))\
    .withColumn("exploded", (col("exploded") == 1).cast("int"))\
    .groupBy("letter", "list_of_numbers")\
    .agg(sum_("exploded").alias("ones"))\
    .show()

Обратите внимание, я импортировал pyspark.sql.functions.sum как sum_, чтобы не перезаписывать встроенную функцию sum.

0 голосов
/ 28 сентября 2018

Если предположить, что длина списка постоянна, я могу думать только о том,

from operator import add
from functools import reduce
import pyspark.sql.functions as F

df = sql.createDataFrame(
    [
        ['A',[3, 1, 2, 3]],
        ['B',[1, 2, 1, 1]]
    ],      
        ['letter','list_of_numbers'])

expr = reduce(add,[F.when(F.col('list_of_numbers').getItem(x)==1, 1)\
                    .otherwise(0) for x in range(4)])
df = df.withColumn('ones', expr)
df.show()

+------+---------------+----+
|letter|list_of_numbers|ones|
+------+---------------+----+
|     A|   [3, 1, 2, 3]|   1|
|     B|   [1, 2, 1, 1]|   3|
+------+---------------+----+
...