Как сделать нечеткое совпадение в PySpark UDF? - PullRequest
0 голосов
/ 13 января 2020

Я пытаюсь запустить следующий код, чтобы создать дополнительный столбец в pyspark df. Идея состоит в том, чтобы взять col из pyspark df и получить максимальное количество баллов, сравнив col со списком ключевых слов, которые у меня есть. (например, варианты)

def get_max_sore(col):
    choices = ['hello','hello world','world hello']
    return max(process.extractOne(col, choices, scorer=fuzz.token_sort_ratio), process.extractOne(col, choices, scorer=fuzz.token_set_ratio), process.extractOne(col, choices))

from fuzzywuzzy import fuzz
from fuzzywuzzy import process
from pyspark.sql.functions import udf
get_max_udf = udf(get_max_sore, StringType())
sdf_1 = sparkDf.withColumn('new_col', get_max_udf(sparkDf.col))
sdf_1.show()

Последнее утверждение sdf_1.show() дает мне ошибку.

Py4JJavaError: An error occurred while calling o1972.showString.
.
.
.
.
ModuleNotFoundError: No module named 'fuzzywuzzy'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
    at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

Это первый раз, когда я работаю со свечой и как она работает. Пожалуйста помоги. Какие функции я могу использовать, чтобы выполнить то же самое, например, нечеткое сопоставление значения col с choices = ['hello','hello world','world hello']. Кроме того, нечеткий пакет установлен во всех узлах.

...