Многофакторная проблема обучения с Spark MLlib - PullRequest
0 голосов
/ 27 мая 2019

Я пытаюсь реализовать алгоритм прогноза для прогнозирования производства электроэнергии, учитывая несколько источников данных (исторические данные о производстве электроэнергии, данные о погоде, ...). Моя проблема в том, что как только мои модели обучаются, я получаю «JavaOutOfMemoryError». Я предполагаю, что это происходит из другой части кода, так как я не имею дело с большими объемами данных.

Я уже заставил это работать с одной функцией, но теперь, когда я пытаюсь рассмотреть несколько входов, я не могу заставить это работать.

Вот как организован мой входной фрейм данных:

,features,target[0],target[1],target[2],target[3],target[4],target[5],target[6],Perimetre
0,"[418.0,189.0,83.0,57.0,79.0,67.0,80.0,47.0,47.6,51.2,49.6,47.2,45.0,45.8]",119.0,51.0,148.0,427.0,404.0,266.0,180.0,Bretagne

У меня там 2 окна по 7 значений в каждом. Должен ли я иметь вектор векторов в качестве входных данных: [[окно для объекта 1] [окно для объекта 2]] или мой фрейм данных уже хорошо организован: [окно для объекта 1, окно для объекта 2].

Вот как я создаю и обучаю свои модели:

def fit(self, train_data, use_old_models):
        for i in range(self.nb_models):

            print("\n-------------------------------------\n")
            print("Model n° {0}".format(i))
            if  use_old_models:
                try :
                    RFM.load("{0}_{1}".format(self.models_weights, i))
                    print("model already exists here : {0}".format("{0}_{1}\\".format(self.models_weights, i)))
                except:
                    print("creating and training model")
                    rf = RF(featuresCol = 'features', labelCol = 'target[{0}]'.format(i))
                    model = rf.fit(train_data)
                    model.save("{0}_{1}".format(self.models_weights, i))
            else:
                print("creating and training model")
                rf = RF(featuresCol = 'features', labelCol = 'target[{0}]'.format(i))
                model = rf.fit(train_data)
                model.save("{0}_{1}".format(self.models_weights, i))


    def forecast(self, test_data):
        results = test_data.select("Perimetre")
        for i in range(self.nb_models):
            model = RFM.load("{0}_{1}".format(self.models_weights, i))
            transformed = model.transform(test_data).select('prediction', "Perimetre") \
                            .withColumnRenamed('prediction', 'prediction[{0}]'.format(i))
            # Joining the prediction of the ith model to a DF containing all predictions
            results = results.join(transformed, "Perimetre")

        # Extracting the prediction columns to be printed
        prediction_exprs = ["prediction[{0}]".format(i) for i in range(self.nb_models)]
        predictions = results.select(*prediction_exprs)
        return predictions

Вот ошибка, которую я получаю:

19/05/27 12:21:10 ERROR Executor: Exception in task 1.0 in stage 24151.0 (TID 17
047)
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        at java.lang.Thread.run(Thread.java:745)
19/05/27 12:21:10 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thr
ead Thread[Executor task launch worker for task 17047,5,main]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        at java.lang.Thread.run(Thread.java:745)
19/05/27 12:21:10 ERROR TaskSetManager: Task 1 in stage 24151.0 failed 1 times;
aborting job
Traceback (most recent call last):
  File "PowercasterLoc.py", line 123, in <module>
    dataframe_prediction = predictions.toPandas()
  File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\pyspark\sql\dataframe.py", line 2142, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\pyspark\sql\dataframe.py", line 533, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\cbouyssi\AppData\Local\Continuum\anaconda3\envs\powercaster-imp
\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2848.collectToPyth
on.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in s
tage 24151.0 failed 1 times, most recent failure: Lost task 1.0 in stage 24151.0
 (TID 17047, localhost, executor driver): java.lang.OutOfMemoryError: Java heap
space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DA
GScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(D
AGScheduler.scala:1877)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(D
AGScheduler.scala:1876)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.
scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala
:1876)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$
1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$
1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGSchedu
ler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(D
AGScheduler.scala:2110)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG
Scheduler.scala:2059)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAG
Scheduler.scala:2048)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737
)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s
cala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.s
cala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.sca
la:299)
        at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset
.scala:3257)
        at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset
.scala:3254)
        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecution
Id$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQ
LExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLEx
ecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.ja
va:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutput
Stream.java:220)
        at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:
173)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(Uns
afeRow.java:554)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:258)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.s
cala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$ap
ply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:
52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Execu
tor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
        ... 1 more

Есть идеи или предложения, что с ним не так? Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...