Преобразовать строку формата libsvm («field1: value field2: value») в DenseVector значений - PullRequest
0 голосов
/ 19 июня 2019

У меня есть столбец в формате libsvm (мл библиотеки spark) field1:value field2:value ...

+--------------+-----+
|      features|label|
+--------------+-----+
|   a:1 b:2 c:3|    0|
|   a:4 b:5 c:6|    0|
|   a:7 b:8 c:9|    1|
|a:10 b:11 c:12|    0|
+--------------+-----+

Я хочу извлечь значения и сохранить их в массивах для каждой строки в pyspark

features.printSchema()

root
 |-- features: string (nullable = false)
 |-- label: integer (nullable = true)

Я использую следующий udf, потому что затронутый столбец является частью фрейма данных

from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors

features_expl = udf(lambda features: Vectors.dense(features.split(" ")).map(lambda feat: float(str(feat.split(":")[1]))))
features=features.withColumn("feats", features_expl(features.features))

Результат, который я получаю: ValueError: не удалось преобразовать строку в число с плавающей точкой: mobile: 0.0не выполняет второе разбиение и вызывает float () для строки.

Я хотел бы получить:

+--------------+-----+
|      features|label|
+--------------+-----+
|     [1, 2, 3]|    0|
|     [4, 5, 6]|    0|
|     [7, 8, 9]|    1|
|  [10, 11, 12]|    0|
+--------------+-----+

1 Ответ

0 голосов
/ 19 июня 2019

У вас две основные проблемы с udf. Во-первых, это не работает так, как вы хотели. Рассмотрим суть вашего кода как следующую функцию:

from pyspark.ml.linalg import Vectors
def features_expl_non_udf(features): 
    return Vectors.dense(
        features.split(" ")).map(lambda feat: float(str(feat.split(":")[1]))
    )

Если вы позвоните по одной из ваших строк:

features_expl_non_udf("a:1 b:2 c:3")
#ValueError: could not convert string to float: a:1

Потому что features.split(" ") возвращает ['a:1', 'b:2', 'c:3'], который вы передаете конструктору Vectors.dense. Это не имеет никакого смысла.

То, что вы намеревались сделать, сначала разбить на пробел, а затем разбить каждое значение результирующего списка на :. Затем вы можете преобразовать эти значения в float и передать список в Vectors.dense.

Вот правильная реализация вашей логики:

def features_expl_non_udf(features): 
    return Vectors.dense(map(lambda feat: float(feat.split(":")[1]), features.split()))
features_expl_non_udf("a:1 b:2 c:3")
#DenseVector([1.0, 2.0, 3.0])

Теперь вторая проблема с вашим udf заключается в том, что вы не указали returnType. Для DenseVector вам нужно использовать VectorUDT в качестве returnType.

from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT

features_expl = udf(
    lambda features: Vectors.dense(
        map(lambda feat: float(feat.split(":")[1]), features.split())
    ),
    VectorUDT()
)
features.withColumn("feats", features_expl(features.features)).show()
#+--------------+-----+----------------+
#|      features|label|           feats|
#+--------------+-----+----------------+
#|   a:1 b:2 c:3|    0|   [1.0,2.0,3.0]|
#|   a:4 b:5 c:6|    0|   [4.0,5.0,6.0]|
#|   a:7 b:8 c:9|    1|   [7.0,8.0,9.0]|
#|a:10 b:11 c:12|    0|[10.0,11.0,12.0]|
#+--------------+-----+----------------+

В качестве альтернативы вы можете выполнить обработку строки на стороне искры, используя regexp_replace и split, но вам все равно придется использовать udf преобразовать окончательный вывод в DenseVector.

from pyspark.sql.functions import regexp_replace, split, udf
from pyspark.ml.linalg import Vectors, VectorUDT

toDenseVector = udf(Vectors.dense, VectorUDT())

features.withColumn(
    "features",
    toDenseVector(
        split(regexp_replace("features", r"\w+:", ""), "\s+").cast("array<float>")
    )
).show()
#+----------------+-----+
#|        features|label|
#+----------------+-----+
#|   [1.0,2.0,3.0]|    0|
#|   [4.0,5.0,6.0]|    0|
#|   [7.0,8.0,9.0]|    1|
#|[10.0,11.0,12.0]|    0|
#+----------------+-----+
...