Я пытаюсь определить pdf udf, который позволяет создавать SparseVectors
из столбца словарей. Ниже приведен пример
from pyspark.sql import Row
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.sql.functions import *
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
# create example data
dff = spark.createDataFrame([Row(features=Row(indices=[1,2], size=10, values = [11,12])),
Row(features=Row(indices=[3,4], size=10, values = [13,14])),
Row(features=Row(indices=[5,6,7], size=10, values = [15,16,17]))
])
print(dff.printSchema())
# access values in the struct
dff.withColumn('sparse', col('features')['size'])
Я могу получить доступ к отдельным парам значений ключей в столбце функций, поэтому я использую rdd.map для создания SparseVectors
.
# create sparse vectors using rdd.map works fine
dff.rdd.map(lambda x: SparseVector(x.features['size'],
x.features['indices'],
x.features['values'])).collect()
. Я бы хотелсделать то же самое, не используя rdd. Я пытался использовать .withColumn
.
# trying using withColumn and SparseVector
dff.withColumn('sparse', SparseVector(col('features')['size'],
col('features')['indices'],
col('features')['values']))
, но получаю ошибку ниже.
TypeError: int() argument must be a string, a bytes-like object or a number, not 'Column'
Я пытался определить udf
s ниже.
# create sparse vector using column of dictionaries and udfs
#@udf
#def create_s_vector(x):
# return SparseVector(x['size'],x['indices'],x['values'])
# not sure whats the proper returnType
@pandas_udf(VectorUDT(), PandasUDFType.SCALAR)
def create_s_vector(x_iter):
for x in x_iter:
yield SparseVector(x['size'],x['indices'],x['values'])
# try using udf
dff.withColumn('sparse', create_s_vector(col('features')))
С помощьюПриведенный выше код я получаю сообщение об ошибке, что тип returnType не поддерживается. Спасибо!