Spark Сравните два кадра данных и найдите количество совпадений - PullRequest
0 голосов
/ 29 сентября 2018

У меня есть две таблицы данных spark sql, у которых нет ни одного уникального столбца.Первый фрейм данных содержит n граммов, второй - длинную текстовую строку (запись в блоге).Я хочу найти совпадения на df2 и добавить счетчик в df1.

DF1
------------
words
------------
Stack
Stack Overflow
users
spark scala

DF2

--------
POSTS
--------
Hello, Stack overflow users , Do you know spark scala
Spark scala is very fast
Users in stack are good in spark, users


Expected output

  ------------     ---------------
    words            match_count
  ------------    ---------------

    Stack               2           
    Stack Overflow      1
    users               3
    spark scala         1

Ответы [ 3 ]

0 голосов
/ 29 сентября 2018

Вы можете использовать функции панд в pyspark.Вот мое решение ниже

>>> from pyspark.sql import Row
>>> import pandas as pd
>>> 
>>> rdd1 = sc.parallelize(['Stack','Stack Overflow','users','spark scala'])
>>> data1 = rdd1.map(lambda x: Row(x))
>>> df1=spark.createDataFrame(data1,['words'])
>>> df1.show()
+--------------+
|         words|
+--------------+
|         Stack|
|Stack Overflow|
|         users|
|   spark scala|
+--------------+

>>> rdd2 = sc.parallelize([
...     'Hello, Stack overflow users , Do you know spark scala',
...     'Spark scala is very fast',
...     'Users in stack are good in spark'
...     ])
>>> data2 = rdd2.map(lambda x: Row(x))
>>> df2=spark.createDataFrame(data2,['posts'])
>>> df2.show()
+--------------------+
|               posts|
+--------------------+
|Hello, Stack over...|
|Spark scala is ve...|
|Users in stack ar...|
+--------------------+

>>> dfPd1 = df1.toPandas()
>>> dfPd2 = df2.toPandas().apply(lambda x: x.str.lower())
>>> 
>>> words = dict((x,0) for x in dfPd1['words'])
>>> 
>>> for i in words:
...     x = dfPd2['posts'].str.contains(i.lower()).sum()
...     if i in words:
...         words[i] = x
... 
>>> 
>>> words
{'Stack': 2, 'Stack Overflow': 1, 'users': 2, 'spark scala': 2}
>>> 
>>> data = pd.DataFrame.from_dict(words, orient='index').reset_index()
>>> data.columns = ['words','match_count']
>>> 
>>> df = spark.createDataFrame(data)
>>> df.show()
+--------------+-----------+
|         words|match_count|
+--------------+-----------+
|         Stack|          2|
|Stack Overflow|          1|
|         users|          2|
|   spark scala|          2|
+--------------+-----------+
0 голосов
/ 01 октября 2018

Подход грубой силы в Scala, который не работает над строками и рассматривает все как строчные буквы, может быть добавлен, но это для другого дня.Полагается не на то, чтобы пытаться исследовать строки, а на том, чтобы определить нграммы такими, какие они есть, нграммы против нграмм и генерировать их, а затем объединять и считать, причем внутреннее соединение имеет значение только для них.Добавлены некоторые дополнительные данные для подтверждения соответствия.

import org.apache.spark.ml.feature._
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.functions._  
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType,ArrayType,LongType,StringType}
import spark.implicits._

// Sample data, duplicates and items to check it works.
val dfPostsInit = Seq(
                  ( "Hello!!, Stack overflow users, Do you know spark scala users."),
                  ( "Spark scala is very fast,"),
                  ( "Users in stack are good in spark"),
                  ( "Users in stack are good in spark"),
                  ( "xy z"),
                  ( "x yz"),
                  ( "ABC"),
                  ( "abc"),
                  ( "XYZ,!!YYY@#$ Hello Bob..."))
                 .toDF("posting")

val dfWordsInit = Seq(("Stack"), ("Stack Overflow"),("users"), ("spark scala"), ("xyz"), ("xy"), ("not found"), ("abc")).toDF("words")
val dfWords     = dfWordsInit.withColumn("words_perm" ,regexp_replace(dfWordsInit("words"), " ", "^")).withColumn("lower_words_perm" ,lower(regexp_replace(dfWordsInit("words"), " ", "^")))

val dfPostsTemp = dfPostsInit.map(r => (r.getString(0), r.getString(0).split("\\W+").toArray )) 
// Tidy Up
val columnsRenamed = Seq("posting", "posting_array") 
val dfPosts = dfPostsTemp.toDF(columnsRenamed: _*)

// Generate Ngrams up to some limit N - needs to be set. This so that we can count properly via a JOIN direct comparison. Can parametrize this in calls below.
// Not easy to find string matching over Array and no other answer presented.
def buildNgrams(inputCol: String = "posting_array", n: Int = 3) = {
  val ngrams = (1 to n).map(i =>
      new NGram().setN(i)
        .setInputCol(inputCol).setOutputCol(s"${i}_grams")
  )
  new Pipeline().setStages((ngrams).toArray)
}

val suffix:String = "_grams"
var i_grams_Cols:List[String] = Nil
for(i <- 1 to 3) {
   val iGCS = i.toString.concat(suffix)
   i_grams_Cols = i_grams_Cols ::: List(iGCS)
}     
// Generate data for checking against later from via rows only and thus not via columns, positional dependency counts, hence permutations. 
val dfPostsNGrams = buildNgrams().fit(dfPosts).transform(dfPosts)

val dummySchema = StructType(
    StructField("phrase", StringType, true) :: Nil)
var dfPostsNGrams2 = spark.createDataFrame(sc.emptyRDD[Row], dummySchema)
for (i <- i_grams_Cols) {
  val nameCol = col({i})
  dfPostsNGrams2 = dfPostsNGrams2.union (dfPostsNGrams.select(explode({nameCol}).as("phrase")).toDF )
 }

val dfPostsNGrams3     = dfPostsNGrams2.withColumn("lower_phrase_concatenated",lower(regexp_replace(dfPostsNGrams2("phrase"), " ", "^"))) 

val result = dfPostsNGrams3.join(dfWords, col("lower_phrase_concatenated") === 
col("lower_words_perm"), "inner")  
              .groupBy("words_perm", "words")
              .agg(count("*").as("match_count"))

result.select("words", "match_count").show(false)

возвращает:

+--------------+-----------+
|words         |match_count|
+--------------+-----------+
|spark scala   |2          |
|users         |4          |
|abc           |2          |
|Stack Overflow|1          |
|xy            |1          |
|Stack         |3          |
|xyz           |1          |
+--------------+-----------+
0 голосов
/ 29 сентября 2018

Кажется, что join-groupBy-count сделает:

df1
    .join(df2, expr("lower(posts) rlike lower(words)"))
    .groupBy("words")
    .agg(count("*").as("match_count"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...