Лучше всего сделать подсчет до создания SparseVector
.Если это невозможно, у вас по существу есть два варианта (до тех пор, пока VectorUDT
s не будут легко преобразованы в массивы ).
В обоих случаях способ подсчета количества значений, которыеСуществовать для каждой функции одинаково.Выполните цикл в диапазоне размера SparseVector
и проверьте, существует ли этот индекс в списке SparseVector.indices
.Это вернет счетчики для всех объектов, в том числе те, где их число равно 0.
Более простым способом может быть создание кортежей в форме (index, 1)
для каждого индекса в SparseVector.indices
, но это исключит изокончательный вывод любых объектов без каких-либо значений.
Опция 1: Определить udf
, explode
и агрегат:
import pyspark.sql.functions as f
featureCount_udf = f.udf(
lambda r: [(x, int(x in r.indices)) for x in range(r.size)],
ArrayType(
StructType(
[
StructField("featureNumber", IntegerType()),
StructField("count", IntegerType())
]
)
)
)
df.select(f.explode(featureCount_udf("features")).alias("features"))\
.select("features.*")\
.groupBy("featureNumber")\
.agg(f.sum("count").alias("count"))\
.show()
#+-------------+-----+
#|featureNumber|count|
#+-------------+-----+
#| 0| 0|
#| 2| 1|
#| 1| 0|
#| 3| 2|
#+-------------+-----+
Опция 2: Преобразовать в rdd
и flatMap
:
from operator import add
df.select("features")\
.rdd\
.flatMap(
lambda r: [(x, int(x in r["features"].indices)) for x in range(r["features"].size)]
)\
.reduceByKey(add)\
.toDF(["featureNumber", "count"])\
.show()
#+-------------+-----+
#|featureNumber|count|
#+-------------+-----+
#| 0| 0|
#| 2| 1|
#| 1| 0|
#| 3| 2|
#+-------------+-----+
Здесь мы flatMap
каждый row
в кортежи вида (featureNumber, containsValue)
.Затем мы можем позвонить reduceByKey
, чтобы добавить переменную индикатора для каждой функции.
Исходный ответ
Если вы хотите получить вывод в словаре, вам придется вызватьcollect()
в какой-то момент.
data = df.select("features").collect()
Теперь, когда у вас есть данные в виде списка pyspark.sql.Row
с, вы можете перебирать и использовать .indices
и .size
, чтобы определить, какие столбцы имеют значения.
print([[int(x in r["features"].indices) for x in range(r["features"].size)] for r in data])
#[[0, 0, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]]
Из этого вы можете создать массив numpy
и суммировать столбцы.Наконец, вызовите enumerate
результата и передайте его конструктору dict
.
Соберите все вместе:
mydict = dict(
enumerate(
np.array(
[[int(x in r["features"].indices) for x in range(r["features"].size)]
for r in data]
).sum(0)
)
)
print(mydict)
#{0: 0, 1: 0, 2: 1, 3: 2}