Вычисление расстояния редактирования в последовательных строках `Spark Dataframe - PullRequest
0 голосов
/ 17 ноября 2018

У меня есть фрейм данных следующим образом:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import spark.implicits._

// some data...
val df = Seq(
  (1, "AA", "BB", ("AA", "BB")),
  (2, "AA", "BB", ("AA", "BB")),
  (3, "AB", "BB", ("AB", "BB"))
).toDF("id","name", "surname", "array")

df.show()

, и я рассчитываю рассчитать расстояние редактирования между столбцом «массив» в следующей строке.В качестве примера я хочу вычислить расстояние редактирования между объектом «массив» в столбце 1 («AA», «BB») и объектом «массив» в столбце 2 («AA», «BB»).Вот функция редактирования расстояния, которую я использую:

def editDist2[A](a: Iterable[A], b: Iterable[A]): Int = {
  val startRow = (0 to b.size).toList
  a.foldLeft(startRow) { (prevRow, aElem) =>
    (prevRow.zip(prevRow.tail).zip(b)).scanLeft(prevRow.head + 1) {
      case (left, ((diag, up), bElem)) => {
        val aGapScore = up + 1
        val bGapScore = left + 1
        val matchScore = diag + (if (aElem == bElem) 0 else 1)
        List(aGapScore, bGapScore, matchScore).min
      }
    }
  }.last
}

Я знаю, что мне нужно создать UDF для этой функции, но не могу этого сделать.Если я использую функцию как есть и использую Spark Windowing, чтобы получить в предыдущей строке:

// creating window - ordered by ID
val window = Window.orderBy("id")

// using the window with lag function to compare to previous value in each column
df.withColumn("edit-d", editDist2(($"array"), lag("array", 1).over(window))).show()

, я получаю следующую ошибку:

<console>:245: error: type mismatch;
 found   : org.apache.spark.sql.ColumnName
 required: Iterable[?]
       df.withColumn("edit-d", editDist2(($"array"), lag("array", 1).over(window))).show()

1 Ответ

0 голосов
/ 20 ноября 2018

Я понял, что вы можете использовать для этого собственную функцию Спарка Левенштейна.Эта функция принимает две строки для сравнения, поэтому ее нельзя использовать с массивом.

// creating window - ordered by ID
val window = Window.orderBy("id")

// using the window with lag function to compare to previous value in each column
df.withColumn("edit-d", levenshtein(($"name"), lag("name", 1).over(window)) + levenshtein(($"surname"), lag("surname", 1).over(window))).show()

, давая желаемый результат:

+---+----+-------+--------+------+
| id|name|surname|   array|edit-d|
+---+----+-------+--------+------+
|  1|  AA|     BB|[AA, BB]|  null|
|  2|  AA|     BB|[AA, BB]|     0|
|  3|  AB|     BB|[AB, BB]|     1|
+---+----+-------+--------+------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...