Я использую PySpark и Elephas, но в данный момент это не работает. Я попробовал пример, приведенный на Elephas do c Github. Обратите внимание, что в консоли PySpark мой код с Keras и Pandas работает (но без использования библиотеки PySpark). Но пример, приведенный на https://github.com/maxpumperla/elephas для взаимодействия Keras и библиотеки PySpark с Elephas, не работает, и я вообще не знаю, как решить эту проблему. Вся моя конфигурация PySpark использует Python 3.7
Вот содержимое моего скрипта и сообщение об ошибке:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('Elephas_App').setMaster('local[4]')
#ici local[4] indique qu'on execute l'application Elephas_app sur la machine locale seule et avec 4 coeurs
sc = SparkContext(conf=conf)
#Chargement des packages
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer=SGD())
##INTEGRATION ELEPHAS
from elephas.utils.rdd_utils import to_simple_rdd
rdd = to_simple_rdd(sc, x_train, y_train)
from elephas.spark_model import SparkModel
spark_model = SparkModel(model, frequency='epoch', mode='asynchronous')
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)
И сообщение об ошибке:
>>> Distribute load
Traceback (most recent call last):
File "/home/admin-tv/deeplearning/elephas_ann.py", line 100, in <module>
spark_model.fit(rdd, epochs=20, batch_size=32, verbose=0, validation_split=0.1)
File "/usr/local/lib/python3.7/dist-packages/elephas/spark_model.py", line 151, in fit
self._fit(rdd, epochs, batch_size, verbose, validation_split)
File "/usr/local/lib/python3.7/dist-packages/elephas/spark_model.py", line 182, in _fit
rdd.mapPartitions(worker.train).collect()
File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in collect
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.IllegalArgumentException: Unsupported class file major version 55
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)