Ошибка Pyspark при запуске кода логистической регрессии - PullRequest
0 голосов
/ 29 ноября 2018

Мы используем Jupyter для pyspark.Мы попытались запустить логистическую регрессию, которая не дала никакой ошибки заранее.Ошибка появилась, когда мы попытались приспособить нашу dataframe к регрессионной модели.

Почему возникает эта ошибка?Это связано с проблемой памяти?Размер набора данных, который мы используем, составляет около 2 ГБ, и мы используем виртуальную машину для pyspark.Сообщение об ошибке вставлено ниже.

Может кто-нибудь сказать мне, почему это происходит и что мне нужно для этого сделать?

 Py4JJavaError   Traceback (most recent call last)
 <ipython-input-24-62d5302d4c2b> in <module>
 ----> 1 lrModel = lr.fit(df_dec)
 /opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
130                 return self.copy(params)._fit(dataset)
131             else:
--> 132                 return self._fit(dataset)
133         else:
134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
286
287     def _fit(self, dataset):
--> 288         java_model = self._fit_java(dataset)
289         model = self._create_model(java_model)
290         return self._copyValues(model)

/opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
283         """
284         self._transfer_params_to_java()
--> 285         return self._java_obj.fit(dataset._jdf)
286
287     def _fit(self, dataset):

/opt/spark/spark-2.3.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255         answer = self.gateway_client.send_command(command)
1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
1258
1259         for temp_arg in temp_args:

/opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
61     def deco(*a, **kw):
62         try:
---> 63             return f(*a, **kw)
 64         except py4j.protocol.Py4JJavaError as e:
 65             s = e.java_exception.toString()

/opt/spark/spark-2.3.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326                 raise Py4JJavaError(
327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
329             else:
330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o226.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task     0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 5, localhost, executor driver): scala.MatchError: [null,1.0,[1.0,3.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 Driver stacktrace:
at                             org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:518)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:488)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:278)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: [null,1.0,[1.0,3.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Вот полный код:

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.classification import          (RandomForestClassifier,GBTClassifier, DecisionTreeClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.ml import Pipeline

spark =      SparkSession.builder.appName("decision_tree_classifier").getOrCreate()

df = spark.read.csv("hdfs:/user/Final2.csv", header= True)

df.printSchema()

from pyspark.sql.types import IntegerType
df = df.withColumn("detail_age", df["detail_age"].cast(IntegerType()))

df = df.withColumn("activity_code",     df["activity_code"].cast(IntegerType()))

df = df.withColumn("resident_status",     df["resident_status"].cast(IntegerType()))
df = df.withColumn("age_recode_12", df["age_recode_12"].cast(IntegerType()))
df = df.withColumn("infant_age_recode_22", df["infant_age_recode_22"].cast(IntegerType()))
df = df.withColumn("place_of_death_and_decedents_status",  df["place_of_death_and_decedents_status"].cast(IntegerType()))
df = df.withColumn("day_of_week_of_death", df["day_of_week_of_death"].cast(IntegerType()))
df = df.withColumn("manner_of_death", df["manner_of_death"].cast(IntegerType()))
df = df.withColumn("place_of_injury", df["place_of_injury"].cast(IntegerType()))
df = df.withColumn("race", df["race"].cast(IntegerType()))
df = df.withColumn("place_of_injury",     df["place_of_injury"].cast(IntegerType()))
df = df.withColumn("race_recode_5", df["race_recode_5"].cast(IntegerType()))
df = df.withColumn("hispanic_originrace_recode",     df["hispanic_originrace_recode"].cast(IntegerType()))

df = df.withColumn("education_2003_revision", df["education_2003_revision"].cast(IntegerType()))
df = df.withColumn("month_of_death", df["month_of_death"].cast(IntegerType()))

df.printSchema()

df.dtypes

df.take(5)

df = df.drop("_c0","X")

df.columns

df.printSchema()

from pyspark.sql.functions import col,expr,when
import numpy as np
import pandas as pd

df1 = df.withColumn("sex", when(df["sex"]=='M',1).otherwise(0))

df1.take(5)

df1.printSchema()

df_vec = VectorAssembler(inputCols=['race', 'resident_status', 'sex', 'month_of_death'], outputCol='features')

df_dec = df_vec.transform(df1)

train, test = df_dec.randomSplit([0.7,0.3])

lr = LogisticRegression(labelCol='manner_of_death')

lrModel = lr.fit(df_dec)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...