У вас две основные проблемы с udf
. Во-первых, это не работает так, как вы хотели. Рассмотрим суть вашего кода как следующую функцию:
from pyspark.ml.linalg import Vectors
def features_expl_non_udf(features):
return Vectors.dense(
features.split(" ")).map(lambda feat: float(str(feat.split(":")[1]))
)
Если вы позвоните по одной из ваших строк:
features_expl_non_udf("a:1 b:2 c:3")
#ValueError: could not convert string to float: a:1
Потому что features.split(" ")
возвращает ['a:1', 'b:2', 'c:3']
, который вы передаете конструктору Vectors.dense
. Это не имеет никакого смысла.
То, что вы намеревались сделать, сначала разбить на пробел, а затем разбить каждое значение результирующего списка на :
. Затем вы можете преобразовать эти значения в float
и передать список в Vectors.dense
.
Вот правильная реализация вашей логики:
def features_expl_non_udf(features):
return Vectors.dense(map(lambda feat: float(feat.split(":")[1]), features.split()))
features_expl_non_udf("a:1 b:2 c:3")
#DenseVector([1.0, 2.0, 3.0])
Теперь вторая проблема с вашим udf
заключается в том, что вы не указали returnType
. Для DenseVector
вам нужно использовать VectorUDT
в качестве returnType
.
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT
features_expl = udf(
lambda features: Vectors.dense(
map(lambda feat: float(feat.split(":")[1]), features.split())
),
VectorUDT()
)
features.withColumn("feats", features_expl(features.features)).show()
#+--------------+-----+----------------+
#| features|label| feats|
#+--------------+-----+----------------+
#| a:1 b:2 c:3| 0| [1.0,2.0,3.0]|
#| a:4 b:5 c:6| 0| [4.0,5.0,6.0]|
#| a:7 b:8 c:9| 1| [7.0,8.0,9.0]|
#|a:10 b:11 c:12| 0|[10.0,11.0,12.0]|
#+--------------+-----+----------------+
В качестве альтернативы вы можете выполнить обработку строки на стороне искры, используя regexp_replace
и split
, но вам все равно придется использовать udf
преобразовать окончательный вывод в DenseVector
.
from pyspark.sql.functions import regexp_replace, split, udf
from pyspark.ml.linalg import Vectors, VectorUDT
toDenseVector = udf(Vectors.dense, VectorUDT())
features.withColumn(
"features",
toDenseVector(
split(regexp_replace("features", r"\w+:", ""), "\s+").cast("array<float>")
)
).show()
#+----------------+-----+
#| features|label|
#+----------------+-----+
#| [1.0,2.0,3.0]| 0|
#| [4.0,5.0,6.0]| 0|
#| [7.0,8.0,9.0]| 1|
#|[10.0,11.0,12.0]| 0|
#+----------------+-----+