pyspark lda с ошибкой "немецкий" stopwrods - PullRequest
0 голосов
/ 16 декабря 2018

Я пытаюсь выполнить LDA на немецком текстовом файле.Однако в начале я по ошибке установил стоп-слова на английском языке,

stopwords.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))

, и код работает нормально.

Затем я нашел эту ошибку, заменил ее на немецкие стоп-слова,

stopwords.setStopWords(StopWordsRemover.loadDefaultStopWords("german"))

, затем появилась ошибка, как показано ниже:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:

Py4JJavaError: An error occurred while calling o219.fit.
: java.lang.IllegalArgumentException: requirement failed: Vectors must have same length: x.length == y.length (0 != 2)
    at breeze.linalg.DenseVector$canDaxpy$.apply(DenseVector.scala:635)
    at breeze.linalg.DenseVector$$anon$4.apply(DenseVector.scala:627)
    at breeze.linalg.DenseVector$$anon$4.apply(DenseVector.scala:625)
    at breeze.linalg.operators.DenseVector_GenericOps$$anon$308.apply(DenseVectorOps.scala:712)
    at breeze.linalg.operators.DenseVector_GenericOps$$anon$308.apply(DenseVectorOps.scala:709)
    at breeze.linalg.NumericOps$class.$plus(NumericOps.scala:186)
    at breeze.linalg.DenseVector.$plus(DenseVector.scala:53)
    at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.updateAlpha(LDAOptimizer.scala:517)
    at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.submitMiniBatch(LDAOptimizer.scala:489)
    at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.next(LDAOptimizer.scala:449)
    at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.next(LDAOptimizer.scala:262)
    at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:336)
    at org.apache.spark.ml.clustering.LDA.fit(LDA.scala:912)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-11-e6867094be6d> in <module>
----> 1 compute_topic(df, 2)

<ipython-input-10-154534ae15d3> in compute_topic(data, num_topics)
     26     lda = LDA(k=num_topics, maxIter=500)
     27 
---> 28     lda_model = lda.fit(libsvm)
     29 
     30     # saving models

/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    263 
    264     def _fit(self, dataset):
--> 265         java_model = self._fit_java(dataset)
    266         return self._create_model(java_model)
    267 

/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    260         """
    261         self._transfer_params_to_java()
--> 262         return self._java_obj.fit(dataset._jdf)
    263 
    264     def _fit(self, dataset):

/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: 'requirement failed: Vectors must have same length: x.length == y.length (0 != 2)'

Вот мой код:

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel
from pyspark.ml.feature import IDF, IDFModel
from pyspark.ml.feature import NGram
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
# Prepare dataset
vocabSize = 10000

tokenizer = RegexTokenizer()
tokenizer.setInputCol("value")
tokenizer.setOutputCol("rawTokens")
tokenizer.setMinTokenLength(5)
tokenizer.setPattern("\\s+|\\/|_|-|\\.|,|\"|:")
tokenizer.setToLowercase(True)
tokenizer.setGaps(True)


stopwords = StopWordsRemover()
stopwords.setInputCol(tokenizer.getOutputCol())
stopwords.setOutputCol("tokens")
stopwords.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))

termFrequency = CountVectorizer()
termFrequency.setInputCol(stopwords.getOutputCol())
termFrequency.setOutputCol("tf_features")
termFrequency.setVocabSize(vocabSize)


idf = IDF()
idf.setInputCol(termFrequency.getOutputCol())
idf.setOutputCol("tfidf_features")

stages = [tokenizer, stopwords, termFrequency, idf]
pipeline = Pipeline(stages=stages)

from pyspark.ml.linalg import Vector as MLVector, Vectors as MLVectors
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.sql.types import StringType

def compute_topic(data, num_topics):

    training_data, test_data = data.randomSplit([0.7, 0.3])

    pipeline_model = pipeline.fit(training_data)

    frequencyMatrix = pipeline_model.transform(training_data)
    # generate sparse vectors for each document
    datapoint = frequencyMatrix.select("tfidf_features")\
        .rdd\
        .map(lambda features: MLVectors.sparse(vocabSize, features))\
        .zipWithIndex()\
        .map(lambda x: (x[1], x[0]))
    # convert again from rdd to DataFrame
    libsvm = spark.createDataFrame(datapoint, ['label', 'features'])
    libsvm.show()
    # perform LDA with default values
    lda = LDA(k=num_topics, maxIter=500)

target = 'file:///target.text'
rdd_line = sc.textFile(target)
llist = rdd_line.collect()
df = spark.createDataFrame(llist, StringType())
df = df.filter(df.value != '')


lda_model = lda.fit(libsvm)
...