Я хочу перевести следующую процедуру из класса [Word2VecModel] https://github.com/apache/spark/blob/branch-2.3/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala в pyspark.
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val vectors = wordVectors.getVectors
.mapValues(vv => Vectors.dense(vv.map(_.toDouble)))
.map(identity) // mapValues doesn't return a serializable map (SI-7005)
val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)
val d = $(vectorSize)
val word2Vec = udf { sentence: Seq[String] =>
if (sentence.isEmpty) {
Vectors.sparse(d, Array.empty[Int], Array.empty[Double])
} else {
val sum = Vectors.zeros(d)
sentence.foreach { word =>
bVectors.value.get(word).foreach { v =>
BLAS.axpy(1.0, v, sum)
}
}
BLAS.scal(1.0 / sentence.size, sum)
sum
}
}
dataset.withColumn($(outputCol), word2Vec(col($(inputCol))))
}
Может ли кто-нибудь помочь мне преобразовать ее в эквивалентный pyspark код? Я попытался сделать часть фрагментов по частям, но не смог сложить их целиком.
Как я обнаружил внутреннюю реализацию BLAS.axpy (), которую я могу использовать для pyspark:
axpy(double a, Vector x, Vector y)
y += a * x
Аналогично BLAS.scal (), внутренняя логика c равна
scal(double a, Vector x)
x = a * x
Для функции идентификации scala Я создал такую же функцию в pyspark, что и у pyspark ее нет.
def identity(x):
return x
Я пытался преобразовать следующую строку
val vectors = wordVectors.getVectors
.mapValues(vv => Vectors.dense(vv.map(_.toDouble)))
.map(identity)
И я придумал это, не знаете, как сделать vv.map (_. ToDouble) в pyspark? Это верно
vectors_final = model.getVectors().rdd.mapValues(lambda vv: Vectors.dense(vv)).map(lambda x: identity(x))
Спасибо.