Как добавить два столбца SparseVectors с помощью Scala? - PullRequest
0 голосов
/ 25 сентября 2018

Учитывая два столбца DataFrame объектов SparseVector, как вы можете добавить (т.е. добавить вектор) два столбца вместе, чтобы создать новый столбец?

Что-то вроде

df.columns
df: org.apache.spark.sql.DataFrame = [v1: SparseVector, v2: SparseVector]

df.withColumn("v3", ADD_COL_FUNCTION(col(v1), col(v2)))

Ответы [ 2 ]

0 голосов
/ 14 января 2019

Вот наше окончательное решение этой проблемы.

Сначала мы реализовали неявные преобразования между векторами Spark и Breeze, предоставленными в в этом посте (обратите внимание на исправления ошибок в комментариях).Это обеспечивает преобразования asBreeze и fromBreeze, используемые в приведенном ниже коде.

Затем мы определили функцию, которая позволяет добавлять столбцы разреженных векторов:

def addVectors(v1Col: String, v2Col: String, outputCol: String)
            : DataFrame => DataFrame = {
  df: DataFrame => {
    def add(v1: SparkVector, v2: SparkVector): SparkVector =
      (v1.asBreeze + v2.asBreeze).fromBreeze
    val func = udf((v1: SparkVector, v2: SparkVector) => add(v1, v2))
    df.withColumn(outputCol, func(col(v1Col), col(v2Col)))
  }
}

Эта функция вызывается с помощью:

 df.transform(addVectors(col1Name, col2name, colOutName))

Вы можете, конечно,хотите включить некоторые проверки на наличие имен столбцов и убедиться, что выходной столбец не перезаписывает ничего, что вы не хотите.

0 голосов
/ 25 сентября 2018

В Spark нет встроенной функции для SparseVectors.С объектами DenseVector можно разобраться, превратив их в массивы, но для SparseVector это может быть убийцей памяти.Вы можете интерпретировать SparseVectors как карту, а затем «добавить» карты вместе.

import org.apache.spark.mllib.linalg.{SparseVector, Vectors, Vector}

def addVecCols(v1: SparseVector, v2: SparseVector): Vector = {
  val map1: Map[Int, Double] = (v1.indices zip v1.values).toMap

  Vectors.sparse(v1. size, 
    (map1 ++ (v2.indices zip v2.values).toMap)
      .map{ case (k, v) => k -> (v + map1.getOrElse(k, 0d))}
      .toList
  )


val addVecUDF = udf((v1: SparseVector, v2: SparseVector) => addVecCols(v1, v2))

Обратите внимание, что в Spark 1.6 тип возвращаемого значения Vectors.sparse равен Vector, в то время как в Spark 2.X - SparseVector, поэтому соответствующим образом измените тип возвращаемого значения addVecCols,Кроме того, в 2.X вы можете использовать библиотеку ml вместо библиотеки mllib.

Использование этого в кадре данных будет

val outDF = inDF.withColumn("added", addVecUDF(col("one_vector"), col("other_vector")))
...