Подход грубой силы в Scala, который не работает над строками и рассматривает все как строчные буквы, может быть добавлен, но это для другого дня.Полагается не на то, чтобы пытаться исследовать строки, а на том, чтобы определить нграммы такими, какие они есть, нграммы против нграмм и генерировать их, а затем объединять и считать, причем внутреннее соединение имеет значение только для них.Добавлены некоторые дополнительные данные для подтверждения соответствия.
import org.apache.spark.ml.feature._
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType,ArrayType,LongType,StringType}
import spark.implicits._
// Sample data, duplicates and items to check it works.
val dfPostsInit = Seq(
( "Hello!!, Stack overflow users, Do you know spark scala users."),
( "Spark scala is very fast,"),
( "Users in stack are good in spark"),
( "Users in stack are good in spark"),
( "xy z"),
( "x yz"),
( "ABC"),
( "abc"),
( "XYZ,!!YYY@#$ Hello Bob..."))
.toDF("posting")
val dfWordsInit = Seq(("Stack"), ("Stack Overflow"),("users"), ("spark scala"), ("xyz"), ("xy"), ("not found"), ("abc")).toDF("words")
val dfWords = dfWordsInit.withColumn("words_perm" ,regexp_replace(dfWordsInit("words"), " ", "^")).withColumn("lower_words_perm" ,lower(regexp_replace(dfWordsInit("words"), " ", "^")))
val dfPostsTemp = dfPostsInit.map(r => (r.getString(0), r.getString(0).split("\\W+").toArray ))
// Tidy Up
val columnsRenamed = Seq("posting", "posting_array")
val dfPosts = dfPostsTemp.toDF(columnsRenamed: _*)
// Generate Ngrams up to some limit N - needs to be set. This so that we can count properly via a JOIN direct comparison. Can parametrize this in calls below.
// Not easy to find string matching over Array and no other answer presented.
def buildNgrams(inputCol: String = "posting_array", n: Int = 3) = {
val ngrams = (1 to n).map(i =>
new NGram().setN(i)
.setInputCol(inputCol).setOutputCol(s"${i}_grams")
)
new Pipeline().setStages((ngrams).toArray)
}
val suffix:String = "_grams"
var i_grams_Cols:List[String] = Nil
for(i <- 1 to 3) {
val iGCS = i.toString.concat(suffix)
i_grams_Cols = i_grams_Cols ::: List(iGCS)
}
// Generate data for checking against later from via rows only and thus not via columns, positional dependency counts, hence permutations.
val dfPostsNGrams = buildNgrams().fit(dfPosts).transform(dfPosts)
val dummySchema = StructType(
StructField("phrase", StringType, true) :: Nil)
var dfPostsNGrams2 = spark.createDataFrame(sc.emptyRDD[Row], dummySchema)
for (i <- i_grams_Cols) {
val nameCol = col({i})
dfPostsNGrams2 = dfPostsNGrams2.union (dfPostsNGrams.select(explode({nameCol}).as("phrase")).toDF )
}
val dfPostsNGrams3 = dfPostsNGrams2.withColumn("lower_phrase_concatenated",lower(regexp_replace(dfPostsNGrams2("phrase"), " ", "^")))
val result = dfPostsNGrams3.join(dfWords, col("lower_phrase_concatenated") ===
col("lower_words_perm"), "inner")
.groupBy("words_perm", "words")
.agg(count("*").as("match_count"))
result.select("words", "match_count").show(false)
возвращает:
+--------------+-----------+
|words |match_count|
+--------------+-----------+
|spark scala |2 |
|users |4 |
|abc |2 |
|Stack Overflow|1 |
|xy |1 |
|Stack |3 |
|xyz |1 |
+--------------+-----------+