Я пытаюсь реализовать алгоритм прогноза для прогнозирования производства электроэнергии, учитывая несколько источников данных (исторические данные о производстве электроэнергии, данные о погоде, ...).
Моя проблема в том, что как только мои модели обучаются, я получаю «JavaOutOfMemoryError». Я предполагаю, что это происходит из другой части кода, так как я не имею дело с большими объемами данных.
Я уже заставил это работать с одной функцией, но теперь, когда я пытаюсь рассмотреть несколько входов, я не могу заставить это работать.
Вот как организован мой входной фрейм данных:
,features,target[0],target[1],target[2],target[3],target[4],target[5],target[6],Perimetre
0,"[418.0,189.0,83.0,57.0,79.0,67.0,80.0,47.0,47.6,51.2,49.6,47.2,45.0,45.8]",119.0,51.0,148.0,427.0,404.0,266.0,180.0,Bretagne
У меня там 2 окна по 7 значений в каждом. Должен ли я иметь вектор векторов в качестве входных данных: [[окно для объекта 1] [окно для объекта 2]] или мой фрейм данных уже хорошо организован: [окно для объекта 1, окно для объекта 2].
Вот как я создаю и обучаю свои модели:
def fit(self, train_data, use_old_models):
for i in range(self.nb_models):
print("\n-------------------------------------\n")
print("Model n° {0}".format(i))
if use_old_models:
try :
RFM.load("{0}_{1}".format(self.models_weights, i))
print("model already exists here : {0}".format("{0}_{1}\\".format(self.models_weights, i)))
except:
print("creating and training model")
rf = RF(featuresCol = 'features', labelCol = 'target[{0}]'.format(i))
model = rf.fit(train_data)
model.save("{0}_{1}".format(self.models_weights, i))
else:
print("creating and training model")
rf = RF(featuresCol = 'features', labelCol = 'target[{0}]'.format(i))
model = rf.fit(train_data)
model.save("{0}_{1}".format(self.models_weights, i))
def forecast(self, test_data):
results = test_data.select("Perimetre")
for i in range(self.nb_models):
model = RFM.load("{0}_{1}".format(self.models_weights, i))
transformed = model.transform(test_data).select('prediction', "Perimetre") \
.withColumnRenamed('prediction', 'prediction[{0}]'.format(i))
# Joining the prediction of the ith model to a DF containing all predictions
results = results.join(transformed, "Perimetre")
# Extracting the prediction columns to be printed
prediction_exprs = ["prediction[{0}]".format(i) for i in range(self.nb_models)]
predictions = results.select(*prediction_exprs)
return predictions
Вот ошибка, которую я получаю:
19/05/27 12:21:10 ERROR Executor: Exception in task 1.0 in stage 24151.0 (TID 17
047)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
at java.lang.Thread.run(Thread.java:745)
19/05/27 12:21:10 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thr
ead Thread[Executor task launch worker for task 17047,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
at java.lang.Thread.run(Thread.java:745)
19/05/27 12:21:10 ERROR TaskSetManager: Task 1 in stage 24151.0 failed 1 times;
aborting job
Traceback (most recent call last):
File "PowercasterLoc.py", line 123, in <module>
dataframe_prediction = predictions.toPandas()
File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\pyspark\sql\dataframe.py", line 2142, in toPandas
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\pyspark\sql\dataframe.py", line 533, in collect
sock_info = self._jdf.collectToPython()
File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2848.collectToPyth
on.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in s
tage 24151.0 failed 1 times, most recent failure: Lost task 1.0 in stage 24151.0
(TID 17047, localhost, executor driver): java.lang.OutOfMemoryError: Java heap
space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DA
GScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(D
AGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(D
AGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.
scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala
:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$
1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$
1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGSchedu
ler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(D
AGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG
Scheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG
Scheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737
)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s
cala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s
cala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.sca
la:299)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset
.scala:3257)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset
.scala:3254)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecution
Id$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQ
LExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLEx
ecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
... 1 more
Есть идеи или предложения, что с ним не так?
Спасибо!