Spark ML (с использованием PySpark): ошибки при использовании многослойного классификатора персептрона - PullRequest
0 голосов
/ 29 сентября 2018

Я работаю над проблемой бинарной классификации, используя различные классификаторы, доступные в Spark-ML;удалось успешно обучить и оценить модели (такие как Наивный Байес, Случайный лес, Логистический регистр и т. д.), однако у меня возникли проблемы при работе с многослойным классификатором персептрона с использованием тех же данных обучения и испытаний.

Может быть, ТАК может помочь мне понять, где я ошибаюсь!

# spark version
sc.version
>>u'2.3.0.2.6.5.25-1'

# python version
import sys
print (sys.version)
>>2.7.5 (default, May  3 2017, 07:55:04) 
 [GCC 4.8.5 20150623 (Red Hat 4.8.5-14)]

# Training data
print type(training_allData),training_allData.count(),len(training_allData.columns) 
>><class 'pyspark.sql.dataframe.DataFrame'> 392836 97 

# Test data
print type(test_allData),test_allData.count(),len(test_allData.columns)
>><class 'pyspark.sql.dataframe.DataFrame'> 88862 97 

# no. of features in my training data after one-hot encoding
len(training_allData.columns)-1
>>96

Я использую приведенный ниже код, как описано на веб-странице Spark ML: -
(https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier)

from pyspark.ml.classification import MultilayerPerceptronClassifier

inputneurons = len(training_allData.columns)-1
layers=[inputneurons,(inputneurons+2)/2,2]
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(training_allData)
# predict the model
result = model.transform(test_allData)

И выдает ошибку «ArrayIndexOutOfBoundsException»: -

Py4JJavaError: An error occurred while calling o2243.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 339.0 failed 4 times, most recent failure: Lost task 4.3 in stage 339.0 (TID 9185, executor 6): java.lang.ArrayIndexOutOfBoundsException
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3$$anonfun$apply$4.apply(Layer.scala:665)
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3$$anonfun$apply$4.apply(Layer.scala:664)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3.apply(Layer.scala:664)
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3.apply(Layer.scala:660)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
at org.apache.spark.mllib.optimization.LBFGS$.runLBFGS(LBFGS.scala:195)
at org.apache.spark.mllib.optimization.LBFGS.optimize(LBFGS.scala:142)
at org.apache.spark.ml.ann.FeedForwardTrainer.train(Layer.scala:854)
at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.train(MultilayerPerceptronClassifier.scala:266)
at org.apache.spark.ml.classification.MultilayerPerceptronClassifier.train(MultilayerPerceptronClassifier.scala:143)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3$$anonfun$apply$4.apply(Layer.scala:665)
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3$$anonfun$apply$4.apply(Layer.scala:664)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3.apply(Layer.scala:664)
at org.apache.spark.ml.ann.DataStacker$$anonfun$5$$anonfun$apply$3.apply(Layer.scala:660)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more

Однако кажется, что код работает нормально для разных размеров входного слоя (<94), и я смог обучить и оценить модель на тестовых данных. Создан фрейм данных «результат»,но не могу получить доступ к базовым данным. </p>

print result.count(),len(result.columns)
>>88862  100

type(result)
>><class 'pyspark.sql.dataframe.DataFrame'>

result.printSchema()
>>root
  |-- input_label: long (nullable = true)
  |-- feature_1: double (nullable = true)
  |-- feature_2: long (nullable = true)
  |-- feature_3: long (nullable = true)
  |-- feature_4: double (nullable = true)
  |...
  |...
  |-- features: vector (nullable = true)
  |-- label: double (nullable = false)
  |-- rawPrediction: vector (nullable = true)
  |-- probability: vector (nullable = true)
  |-- prediction: double (nullable = false)

result.select("label","prediction","probability").show(10)

Ошибка: -

An error occurred while calling o1809.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 325.0 failed 4 times, most recent failure: Lost task 0.3 in stage 325.0 (TID 8700, executor 18): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalArgumentException: requirement failed: A & B Dimension mismatch!
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.ml.ann.BreezeUtil$.dgemm(BreezeUtil.scala:41)
at org.apache.spark.ml.ann.AffineLayerModel.eval(Layer.scala:164)
at org.apache.spark.ml.ann.FeedForwardModel.forward(Layer.scala:508)
at org.apache.spark.ml.ann.FeedForwardModel.predictRaw(Layer.scala:561)
at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:343)
at org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel.predictRaw(MultilayerPerceptronClassifier.scala:300)
at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:117)
at org.apache.spark.ml.classification.ProbabilisticClassificationModel$$anonfun$1.apply(ProbabilisticClassifier.scala:116)
... 19 more

PS: я относительно новичок в NN, поэтому я исследовал несколько сообщений SO и попробовал их решениятакже, но ни один из них не работает в моем случае!

1 Ответ

0 голосов
/ 08 августа 2019

Вам необходимо сопоставить первый слой с точным количеством объектов, я бы сказал, что в первом слое больше нейронов, чем входных объектов.Пожалуйста, проверьте количество функций, которые вы передаете в inputneurons.

...