UDF Pandas, которая создает SparseVectors - PullRequest
1 голос
/ 10 октября 2019

Я пытаюсь определить pdf udf, который позволяет создавать SparseVectors из столбца словарей. Ниже приведен пример

from pyspark.sql import Row
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.functions import *

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

# create example data
dff = spark.createDataFrame([Row(features=Row(indices=[1,2], size=10, values = [11,12])),
                             Row(features=Row(indices=[3,4], size=10, values = [13,14])),
                             Row(features=Row(indices=[5,6,7], size=10, values = [15,16,17]))
                            ])

print(dff.printSchema())

# access values in the struct
dff.withColumn('sparse', col('features')['size'])

Я могу получить доступ к отдельным парам значений ключей в столбце функций, поэтому я использую rdd.map для создания SparseVectors.

# create sparse vectors using rdd.map works fine
dff.rdd.map(lambda x: SparseVector(x.features['size'],
                                   x.features['indices'],
                                   x.features['values'])).collect()

. Я бы хотелсделать то же самое, не используя rdd. Я пытался использовать .withColumn.

# trying using withColumn and SparseVector
dff.withColumn('sparse', SparseVector(col('features')['size'],
                                      col('features')['indices'],
                                      col('features')['values']))

, но получаю ошибку ниже.

TypeError: int() argument must be a string, a bytes-like object or a number, not 'Column'

Я пытался определить udf s ниже.

# create sparse vector using column of dictionaries and udfs
#@udf
#def create_s_vector(x):
#    return SparseVector(x['size'],x['indices'],x['values'])

# not sure whats the proper returnType
@pandas_udf(VectorUDT(), PandasUDFType.SCALAR)
def create_s_vector(x_iter):
    for x in x_iter:
        yield SparseVector(x['size'],x['indices'],x['values'])

# try using udf
dff.withColumn('sparse', create_s_vector(col('features')))

С помощьюПриведенный выше код я получаю сообщение об ошибке, что тип returnType не поддерживается. Спасибо!

...