Как рассчитать количество существующей функции в SparseVector - PullRequest
0 голосов
/ 19 сентября 2018

У меня есть такой фрейм данных:

+---+-------+--------------+
| id|clicked|     features |
+---+-------+--------------+
|  7|  1.0  |(4,[3],[1.0]) |
|  8|  0.0  |(4,[2],[12.0])|
|  9|  0.0  |(4,[3],[0.1]) |
+---+-------+--------------+

, и я преобразовал его в СДР:

Row(id=7, clicked=1.0, features=SparseVector(4, {3: 1.0}))
Row(id=8, clicked=0.0, features=SparseVector(4, {2: 12.0}))
Row(id=9, clicked=0.0, features=SparseVector(4, {3: 0.1}))

Теперь я хочу найти количество существующих для каждой функции.Например, в моем dataframe / rdd столбец компонентов содержит 5 объектов (от 0 до 4).Поскольку объекты в индексах 0, 1 и 4 не содержат никаких значений, их значения равны 0. И индекс объектов 2 равен 1, индекс объектов 3 равен 2.

Я хочу получить эту информацию в структуре данных словаря,Как это сделать?

{0:0, 1:0, 2:1, 3:2, 4:0}

Я использую PySpark, но ответ с помощью Scala тоже подойдет.

1 Ответ

0 голосов
/ 19 сентября 2018

Лучше всего сделать подсчет до создания SparseVector.Если это невозможно, у вас по существу есть два варианта (до тех пор, пока VectorUDT s не будут легко преобразованы в массивы ).

В обоих случаях способ подсчета количества значений, которыеСуществовать для каждой функции одинаково.Выполните цикл в диапазоне размера SparseVector и проверьте, существует ли этот индекс в списке SparseVector.indices.Это вернет счетчики для всех объектов, в том числе те, где их число равно 0.

Более простым способом может быть создание кортежей в форме (index, 1) для каждого индекса в SparseVector.indices, но это исключит изокончательный вывод любых объектов без каких-либо значений.

Опция 1: Определить udf, explode и агрегат:

import pyspark.sql.functions as f

featureCount_udf = f.udf(
    lambda r: [(x, int(x in r.indices)) for x in range(r.size)],
    ArrayType(
        StructType(
            [
                StructField("featureNumber", IntegerType()),
                StructField("count", IntegerType())
            ]
        )
    )
)

df.select(f.explode(featureCount_udf("features")).alias("features"))\
    .select("features.*")\
    .groupBy("featureNumber")\
    .agg(f.sum("count").alias("count"))\
    .show()
#+-------------+-----+
#|featureNumber|count|
#+-------------+-----+
#|            0|    0|
#|            2|    1|
#|            1|    0|
#|            3|    2|
#+-------------+-----+

Опция 2: Преобразовать в rdd и flatMap:

from operator import add

df.select("features")\
    .rdd\
    .flatMap(
        lambda r: [(x, int(x in r["features"].indices)) for x in range(r["features"].size)]
    )\
    .reduceByKey(add)\
    .toDF(["featureNumber", "count"])\
    .show()
#+-------------+-----+
#|featureNumber|count|
#+-------------+-----+
#|            0|    0|
#|            2|    1|
#|            1|    0|
#|            3|    2|
#+-------------+-----+

Здесь мы flatMap каждый row в кортежи вида (featureNumber, containsValue).Затем мы можем позвонить reduceByKey, чтобы добавить переменную индикатора для каждой функции.


Исходный ответ

Если вы хотите получить вывод в словаре, вам придется вызватьcollect() в какой-то момент.

data = df.select("features").collect()

Теперь, когда у вас есть данные в виде списка pyspark.sql.Row с, вы можете перебирать и использовать .indices и .size, чтобы определить, какие столбцы имеют значения.

print([[int(x in r["features"].indices) for x in range(r["features"].size)] for r in data])
#[[0, 0, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1]]

Из этого вы можете создать массив numpy и суммировать столбцы.Наконец, вызовите enumerate результата и передайте его конструктору dict.

Соберите все вместе:

mydict = dict(
    enumerate(
        np.array(
            [[int(x in r["features"].indices) for x in range(r["features"].size)]
             for r in data]
        ).sum(0)
    )
)
print(mydict)
#{0: 0, 1: 0, 2: 1, 3: 2}
...