TL; DR : иногда PySpark VectorAssembler
выдает ошибку в моем конвейере, и иногда мой конвейер работает нормально.
Я использую PySpark 2.1.0 (это единственная версия, поддерживаемая ИТ-командой на работе). Мой конвейер состоит из применения StringIndexer
к категориальным функциям, OneHotEncoder
для этих индексов и VectorAssembler
для объединения этих кодировок с числовыми функциями, которые не требуют предварительной обработки (они уже были нормализованы как часть моей функцииинженерия - я называю их "как есть" ниже). Затем я передаю эти функции в алгоритм DecisionTreeClassifier
, но мой код выходит из строя до этого шага, если он выходит из строя.
Ошибка, которую я получу, если я ее получу: Caused by: org.apache.spark.SparkException: Values to assemble cannot be null
. Я проверил, и все столбцы, используемые в качестве объектов, не содержат нулевых значений. (Я вменяю их ранее, и я убедился, что это приводит к нулям.)
Если конвейер работает в данном ядре, он всегда работает в этом ядре. Если полный конвейер не работает в данном ядре, он всегда не работает в этом ядре. Если я разбиваю нерабочий конвейер на части - то есть обрабатываю категориальные особенности отдельно от числовых «как есть» - иногда категориальный конвейер работает, а числовой - нет, а в других случаях все наоборот.
Я загружаю данные в PySpark из Hive, используя sqlContext.sql("SELECT * FROM <table>")
. <table>
содержит очищенные данные и инженерные функции и не меняется между успешными и неудачными конвейерами. Мне интересно, различается ли порядок данных при каждом вызове SELECT *
(то есть каждый раз, когда я запускаю новое ядро), и некоторые порядки данных вызывают ошибку - хотя я не смог распознать шаблон. Я нашел сообщение в блоге , в котором содержится VectorAssembler
для вывода метаданных из первой строки набора данных. Но если мой ввод не содержит нулей, порядок не должен иметь значения, не так ли? Единственное, о чем я могу подумать, это первая строка, содержащая «пустой» вектор из поведения OneHotEncoder
dropLast=True
(например, (2, [], [])
), но разве VectorAssembler
не сможет справиться с этим?
Вот мой код. Я заранее прошу прощения за невозможность предоставить данные, которые воссоздают ошибку или приводят к успешному конвейеру, но мой набор данных большой и проприетарный. Я могу попытаться предоставить некоторые фрагменты, если это поможет в устранении неполадок. Спасибо.
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
# load data from Hive
train_df = sqlContext.sql('select * from <hive table>')
# impute missing data
asis_cols = [asis_col1, asis_col2, ...]
ohe_cols = ['fac_usg', 'src_type']
train_df_imp = (train_df.na.fill(0, subset=asis_cols)
.na.fill('blank', subset=ohe_cols))
# create pipeline
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idxd") for c in ohe_cols]
encoders = [OneHotEncoder(inputCol=c+"_idxd", outputCol=c+"_enc") for c in ohe_cols]
assembly_list = asis_cols + [c+"_enc" for c in ohe_cols]
assembler = VectorAssembler(inputCols=assembly_list, outputCol='features')
labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')
alg = DecisionTreeClassifier(labelCol='indexedLabel', minInstancesPerNode=3)
pipe = Pipeline(stages=indexers + encoders + [assembler] + [labelIndexer] + [alg])
# fit model
model = pipe.fit(train_df_imp)
Редактировать : Вот схема train_df
. Все asis_col
имеют тип double
.
root
|-- rec_id: integer (nullable = true)
|-- rec_date: date (nullable = true)
|-- amount_usd: double (nullable = true)
|-- src_type: string (nullable = true)
|-- fac_usg: string (nullable = true)
|-- description: string (nullable = true)
|-- asis_col1: double (nullable = true)
|-- asis_col2: double (nullable = true)
...
Пример шагов, которые я предпринял, чтобы подтвердить, что na.fill
работал:
from pyspark.sql.functions import count, isnan, when, col
train_df_imp.select([count(when(isnan(c), c)).alias(c) for c in asis_cols]).toPandas().loc[0].sum()
>>> 0
Стековая трассировка сообщения об ошибке:
Py4JJavaError: An error occurred while calling o1468.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 86.0 failed 4 times, most recent failure: Lost task 0.3 in stage 86.0 (TID 4907, poldcdhdn010.dev.intranet, executor 4): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<asis_col1:double,... 13 more fields>) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
... 29 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1326)
at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:112)
at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:105)
at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:116)
at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:45)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:96)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<asis_col1:double,... 13 more fields>) => vector)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
... 29 more