Точечный продукт в Spark Scala - PullRequest
0 голосов
/ 07 марта 2020

У меня есть два фрейма данных в Spark Scala, где второй столбец каждого фрейма данных представляет собой массив чисел

val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
data22.show(truncate=false)

+---+---------------------+
|ID |tf_idf               |
+---+---------------------+
|1  |[0.693, 0.702]       |
|2  |[0.69314, 0.0]       |
|3  |[0.0, 0.693147]      |
+---+---------------------+



val data12= Seq((1,List(0.69314,0.6931471))).toDF("ID","tf_idf")
data12.show(truncate=false)

+---+--------------------+
|ID |tf_idf              |
+---+--------------------+
|1  |[0.693, 0.805]      |
+---+--------------------+

Мне нужно выполнить точечное произведение между строками в этих двух кадрах данных. То есть мне нужно умножить массив tf_idf на data12 с каждой строкой tf_idf в data22.

(Пример: первая строка в точечном произведении должна быть такой: 0,693 * 0,693 + 0,702 * 0,805

вторая строка: 0,69314 * 0,693 + 0,0 * 0,805

третья строка: 0,0 * 0,693 + 0,693147 * 0,805)

По сути, я хочу что-то (например, умножение матриц) data22*transpose(data12) Буду признателен, если кто-то может предложить метод для этого в Spark Scala ,

Спасибо

Ответы [ 2 ]

4 голосов
/ 07 марта 2020

Spark Version 2.4 +: Используйте несколько функций для массива, таких как zip_with и aggregate, которые дают вам более простой код. Чтобы следовать вашему подробному описанию, я изменил join на crossJoin.

val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
val data12= Seq((1,List(0.693,0.805))).toDF("ID2","tf_idf2")

val df = data22.crossJoin(data12).drop("ID2")
df.withColumn("DotProduct", expr("aggregate(zip_with(tf_idf, tf_idf2, (x, y) -> x * y), 0D, (sum, x) -> sum + x)")).show(false)

Вот результат.

+---+---------------------+--------------+-------------------+
|ID |tf_idf               |tf_idf2       |DotProduct         |
+---+---------------------+--------------+-------------------+
|1  |[0.693147, 0.6931471]|[0.693, 0.805]|1.0383342865       |
|2  |[0.69314, 0.0]       |[0.693, 0.805]|0.48034601999999993|
|3  |[0.0, 0.693147]      |[0.693, 0.805]|0.557983335        |
+---+---------------------+--------------+-------------------+
1 голос
/ 07 марта 2020

Решение показано ниже:

scala> val data22= Seq((1,List(0.693147,0.6931471)),(2,List(0.69314, 0.0)),(3,List(0.0, 0.693147))).toDF("ID","tf_idf")
data22: org.apache.spark.sql.DataFrame = [ID: int, tf_idf: array<double>]

scala> val data12= Seq((1,List(0.69314,0.6931471))).toDF("ID","tf_idf")
data12: org.apache.spark.sql.DataFrame = [ID: int, tf_idf: array<double>]

scala> val arrayDot = data12.take(1).map(row => (row.getAs[Int](0), row.getAs[WrappedArray[Double]](1).toSeq))
arrayDot: Array[(Int, Seq[Double])] = Array((1,WrappedArray(0.69314, 0.6931471)))

scala> val dotColumn = arrayDot(0)._2
dotColumn: Seq[Double] = WrappedArray(0.69314, 0.6931471)

scala> val dotUdf = udf((y: Seq[Double]) => y zip dotColumn map(z => z._1*z._2) reduce(_ + _))
dotUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(ArrayType(DoubleType,false))))

scala> data22.withColumn("dotProduct", dotUdf('tf_idf)).show
+---+--------------------+-------------------+
| ID|              tf_idf|         dotProduct|
+---+--------------------+-------------------+
|  1|[0.693147, 0.6931...|   0.96090081381841|
|  2|      [0.69314, 0.0]|0.48044305959999994|
|  3|     [0.0, 0.693147]|    0.4804528329237|
+---+--------------------+-------------------+

Обратите внимание, что оно умножается на массив tf_idf в data12 на каждую строку tf_idf в data22.

Дайте мне знать, если это поможет !!

...