Как заменить нули в столбце Вектор? - PullRequest
0 голосов
/ 07 июня 2018

У меня есть столбец типа [вектор], и в нем есть нулевые значения, от которых я не могу избавиться, вот пример

import org.apache.spark.mllib.linalg.Vectors

val sv1: Vector = Vectors.sparse(58, Array(8, 45), Array(1.0, 1.0))
val df_1 = sc.parallelize(List(("id_1", sv1))).toDF("id", "feature_vector")
val df_2 = sc.parallelize(List(("id_1", 10.0), ("id_2", 10.0))).toDF("id", "numeric_feature")

val df_joined = df_1.join(df_2, Seq("id"), "right")

df_joined.show()

+----+--------------------+---------------+
|  id|      feature_vector|numeric_feature|
+----+--------------------+---------------+
|id_1|(58,[8,45],[1.0,1...|           10.0|
|id_2|                null|           10.0|
+----+--------------------+---------------+

Что я хотел бы сделать:

val map = Map("feature_vector" -> sv1)
val result = df_joined.na.fill(map)

Но это выдает ошибку:

Message: Unsupported value type org.apache.spark.mllib.linalg.SparseVector ((58,[8,45],[1.0,1.0])).

Другие вещи, которые я пробовал:

df_joined.withColumn("feature_vector", when(col("feature_vector").isNull, sv1).otherwise(sv1)).show

из как отфильтровать нулевое значение изфрейм данных spark

Я изо всех сил пытаюсь найти решение, которое будет работать на Spark 1.6

Ответы [ 2 ]

0 голосов
/ 07 июня 2018

Вы можете воспользоваться помощью СДР здесь, если хотите:

val naFillRDD = df_joined.map{ r => r match{
  case Row(id, feature_vector: Vector, numeric_feature ) => Row(id, feature_vector, numeric_feature )
  case Row(id, _, numeric_feature) => Row(id, sv1, numeric_feature)
}}

, а затем переключиться обратно на фрейм данных:

val naFillDF = sqlContext.createDataFrame(naFillRDD, df_joined.schema)

naFillDF.show(false)
//+----+---------------------+---------------+
//|id  |feature_vector       |numeric_feature|
//+----+---------------------+---------------+
//|id_1|(58,[8,45],[1.0,1.0])|10.0           |
//|id_2|(58,[8,45],[1.0,1.0])|10.0           |
//+----+---------------------+---------------+
0 голосов
/ 07 июня 2018

Объединение и соединение должны сделать трюк

import org.apache.spark.sql.functions.{coalesce, broadcast}

val fill = Seq(
  Tuple1(Vectors.sparse(58, Array(8, 45), Array(1.0, 1.0)))
).toDF("fill")


df_joined
  .join(broadcast(fill))
  .withColumn("feature_vector", coalesce($"feature_vector", $"fill"))
  .drop("fill")
...