Spark преобразует массив структур в вектор для евклидова расстояния - PullRequest
1 голос
/ 01 июля 2019

Привет У меня есть следующий столбец набора данных:

+-----------------------+
|hashes                 |
+-----------------------+
|[[-7.0], [0.0], [5.0]] |
|[[-8.0], [1.0], [1.0]] |
|[[-6.0], [1.0], [1.0]] |
+-----------------------+

, который был сгенерирован:

val brp = new BucketedRandomProjectionLSH().
          setBucketLength(2).
          setNumHashTables(3).
          setInputCol("features").
          setOutputCol("hashes")

    val model = brp.fit(dfVa)
    val dfHash = model.transform(dfVa)

Со следующей схемой:

|-- hashes: array (nullable = true)
 |    |-- element: vector (containsNull = true)

IЯ хотел бы сделать перекрестное соединение с другим набором данных с тем же столбцом и вычислить евклидово расстояние с помощью UDF, который я сделал:

val euclideanDistance = udf { (v1: Vector, v2: Vector) =>
        sqrt(Vectors.sqdist(v1, v2))
}

cookesWb
   .join(cookesNext)
   .withColumn("Distance", euclideanDistance(
        cookesWb.col("hashes"),
        broadcast(cookesNext.col("hashes"))
   ))
   .filter(col("Distance").lt(80))

Однако я получаю следующую ошибку:

cannot resolve 'UDF(hashes, hashes)' due to data type mismatch: argument 1 requires vector type, however, '`hashes`' is of array<struct<type:tinyint,size:int,indices:array<int>,values:array<double>>>  

Знаете ли вы, как преобразовать этот грязный тип в Вектор, чтобы я мог выполнить функцию?
Спасибо.

1 Ответ

1 голос
/ 01 июля 2019

Здесь у вас есть массив векторов sparkML.Чтобы иметь возможность использовать свой UDF, сначала нужно преобразовать его в вектор.Мы можем определить другой UDF для этого.

import scala.collection.mutable.WrappedArray
import org.apache.spark.ml.linalg.{Vector, Vectors}
val toVect = udf { (x : WrappedArray[Vector]) =>
    // we flatten the array of vectors
    val flatArray : Array[Double] = x.flatMap(_.toArray).toArray 
    Vectors.dense(flatArray)
}

NB: Array[Vector] не будет работать здесь.Когда вы манипулируете массивами в spark и используете UDF, WrappedArray - это тип, который вам нужно использовать.

Тогда вы можете просто выполнить свой crossJoin, например, так:

df
  .crossJoin(df2)
  .withColumn("d", euclideanDistance(toVect(df.col("hashes")),
                                     toVect(df2.col("hashes"))))
  .show()
...