Я пытаюсь выполнить LDA на немецком текстовом файле.Однако в начале я по ошибке установил стоп-слова на английском языке,
stopwords.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))
, и код работает нормально.
Затем я нашел эту ошибку, заменил ее на немецкие стоп-слова,
stopwords.setStopWords(StopWordsRemover.loadDefaultStopWords("german"))
, затем появилась ошибка, как показано ниже:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
Py4JJavaError: An error occurred while calling o219.fit.
: java.lang.IllegalArgumentException: requirement failed: Vectors must have same length: x.length == y.length (0 != 2)
at breeze.linalg.DenseVector$canDaxpy$.apply(DenseVector.scala:635)
at breeze.linalg.DenseVector$$anon$4.apply(DenseVector.scala:627)
at breeze.linalg.DenseVector$$anon$4.apply(DenseVector.scala:625)
at breeze.linalg.operators.DenseVector_GenericOps$$anon$308.apply(DenseVectorOps.scala:712)
at breeze.linalg.operators.DenseVector_GenericOps$$anon$308.apply(DenseVectorOps.scala:709)
at breeze.linalg.NumericOps$class.$plus(NumericOps.scala:186)
at breeze.linalg.DenseVector.$plus(DenseVector.scala:53)
at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.updateAlpha(LDAOptimizer.scala:517)
at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.submitMiniBatch(LDAOptimizer.scala:489)
at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.next(LDAOptimizer.scala:449)
at org.apache.spark.mllib.clustering.OnlineLDAOptimizer.next(LDAOptimizer.scala:262)
at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:336)
at org.apache.spark.ml.clustering.LDA.fit(LDA.scala:912)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
IllegalArgumentException Traceback (most recent call last)
<ipython-input-11-e6867094be6d> in <module>
----> 1 compute_topic(df, 2)
<ipython-input-10-154534ae15d3> in compute_topic(data, num_topics)
26 lda = LDA(k=num_topics, maxIter=500)
27
---> 28 lda_model = lda.fit(libsvm)
29
30 # saving models
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/ml/base.py in fit(self, dataset, params)
62 return self.copy(params)._fit(dataset)
63 else:
---> 64 return self._fit(dataset)
65 else:
66 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/ml/wrapper.py in _fit(self, dataset)
263
264 def _fit(self, dataset):
--> 265 java_model = self._fit_java(dataset)
266 return self._create_model(java_model)
267
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
260 """
261 self._transfer_params_to_java()
--> 262 return self._java_obj.fit(dataset._jdf)
263
264 def _fit(self, dataset):
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/cray/spark2/2.2_kubernetes.0000.201808240356_0027/python/pyspark/sql/utils.py in deco(*a, **kw)
77 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
78 if s.startswith('java.lang.IllegalArgumentException: '):
---> 79 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
80 raise
81 return deco
IllegalArgumentException: 'requirement failed: Vectors must have same length: x.length == y.length (0 != 2)'
Вот мой код:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel
from pyspark.ml.feature import IDF, IDFModel
from pyspark.ml.feature import NGram
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
# Prepare dataset
vocabSize = 10000
tokenizer = RegexTokenizer()
tokenizer.setInputCol("value")
tokenizer.setOutputCol("rawTokens")
tokenizer.setMinTokenLength(5)
tokenizer.setPattern("\\s+|\\/|_|-|\\.|,|\"|:")
tokenizer.setToLowercase(True)
tokenizer.setGaps(True)
stopwords = StopWordsRemover()
stopwords.setInputCol(tokenizer.getOutputCol())
stopwords.setOutputCol("tokens")
stopwords.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))
termFrequency = CountVectorizer()
termFrequency.setInputCol(stopwords.getOutputCol())
termFrequency.setOutputCol("tf_features")
termFrequency.setVocabSize(vocabSize)
idf = IDF()
idf.setInputCol(termFrequency.getOutputCol())
idf.setOutputCol("tfidf_features")
stages = [tokenizer, stopwords, termFrequency, idf]
pipeline = Pipeline(stages=stages)
from pyspark.ml.linalg import Vector as MLVector, Vectors as MLVectors
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.sql.types import StringType
def compute_topic(data, num_topics):
training_data, test_data = data.randomSplit([0.7, 0.3])
pipeline_model = pipeline.fit(training_data)
frequencyMatrix = pipeline_model.transform(training_data)
# generate sparse vectors for each document
datapoint = frequencyMatrix.select("tfidf_features")\
.rdd\
.map(lambda features: MLVectors.sparse(vocabSize, features))\
.zipWithIndex()\
.map(lambda x: (x[1], x[0]))
# convert again from rdd to DataFrame
libsvm = spark.createDataFrame(datapoint, ['label', 'features'])
libsvm.show()
# perform LDA with default values
lda = LDA(k=num_topics, maxIter=500)
target = 'file:///target.text'
rdd_line = sc.textFile(target)
llist = rdd_line.collect()
df = spark.createDataFrame(llist, StringType())
df = df.filter(df.value != '')
lda_model = lda.fit(libsvm)