При сопоставлении строк с использованием конвейера ML возникла ошибка. Не удалось выполнить пользовательскую функцию ($ anonfun $ 1: (вектор) => массив <vector>) - PullRequest
0 голосов
/ 11 апреля 2019

Я пытаюсь сопоставить строки на двух данных.Допустим, dataframe1 содержит X предложений, а dataframe2 Y предложений.Мне нужно проверить, совпадает ли любое предложение из Dataframe1 с Dataframe2.Я пытаюсь использовать трубопровод ML следующим образом:

def match_names(df_1, df_2):


    pipeline = Pipeline(stages=[
        RegexTokenizer(
            pattern="", inputCol="name", outputCol="tokens", minTokenLength=1
        ),
        NGram(n=3, inputCol="tokens", outputCol="ngrams"),
        HashingTF(inputCol="ngrams", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh")
    ])

    model = pipeline.fit(df_1)

    stored_hashed = model.transform(df_1)
    landed_hashed = model.transform(df_2)

    matched_df = model.stages[-1].approxSimilarityJoin(stored_hashed, landed_hashed, 1.0, "confidence").select(
        col("datasetA.name"), col("datasetB.name"), col("confidence"))
    matched_df.show(20, False)

Я получаю ошибку ниже:

Py4JJavaError: An error occurred while calling o4136.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 181.0 failed 1 times, most recent failure: Lost task 0.0 in stage 181.0 (TID 899, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => array<vector>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.project_doConsume$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.ml.feature.MinHashLSHModel$$anonfun$1.apply(MinHashLSH.scala:57)
    at org.apache.spark.ml.feature.MinHashLSHModel$$anonfun$1.apply(MinHashLSH.scala:56)
    ... 19 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at sun.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => array<vector>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.project_doConsume$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
    at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Caused by: java.lang.IllegalArgumentException: requirement failed: Must have at least 1 non zero entry.
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.ml.feature.MinHashLSHModel$$anonfun$1.apply(MinHashLSH.scala:57)
    at org.apache.spark.ml.feature.MinHashLSHModel$$anonfun$1.apply(MinHashLSH.scala:56)
    ... 19 more

Не уверен, что я делаю неправильно!(

1 Ответ

0 голосов
/ 13 мая 2019

Это деталь реализации с minHashLSH. Из документации PySpark:

Класс LSH для расстояния Джакарда. Ввод может быть плотным или разреженным векторы, но это более эффективно, если оно редкое. Например, Vectors.sparse (10, [(2, 1.0), (3, 1.0), (5, 1.0)]) означает, что существует 10 элементы в пространстве. Этот набор содержит элементы 2, 3 и 5. Также любой входной вектор должен иметь как минимум 1 ненулевой индекс, и все ненулевые значения обрабатываются как двоичные значения «1».

Ожидается, что векторы будут иметь хотя бы один ненулевой элемент в наборе. Таким образом, вполне вероятно, что ваш RegexTokenizer + NGram выплевывает ngrams, что является полным нулевым вектором.

Попробуйте исключить все строки, где vectors является нулевым набором, прежде чем передавать его на minHashLSH. К сожалению, это будет означать, что вам придется извлечь его из конвейера и запустить отдельно.

...