PySpark VectorAssembler «Значения для сборки не могут быть нулевыми», когда нет нулевых значений - PullRequest
2 голосов
/ 11 октября 2019

TL; DR : иногда PySpark VectorAssembler выдает ошибку в моем конвейере, и иногда мой конвейер работает нормально.

Я использую PySpark 2.1.0 (это единственная версия, поддерживаемая ИТ-командой на работе). Мой конвейер состоит из применения StringIndexer к категориальным функциям, OneHotEncoder для этих индексов и VectorAssembler для объединения этих кодировок с числовыми функциями, которые не требуют предварительной обработки (они уже были нормализованы как часть моей функцииинженерия - я называю их "как есть" ниже). Затем я передаю эти функции в алгоритм DecisionTreeClassifier, но мой код выходит из строя до этого шага, если он выходит из строя.

Ошибка, которую я получу, если я ее получу: Caused by: org.apache.spark.SparkException: Values to assemble cannot be null. Я проверил, и все столбцы, используемые в качестве объектов, не содержат нулевых значений. (Я вменяю их ранее, и я убедился, что это приводит к нулям.)

Если конвейер работает в данном ядре, он всегда работает в этом ядре. Если полный конвейер не работает в данном ядре, он всегда не работает в этом ядре. Если я разбиваю нерабочий конвейер на части - то есть обрабатываю категориальные особенности отдельно от числовых «как есть» - иногда категориальный конвейер работает, а числовой - нет, а в других случаях все наоборот.

Я загружаю данные в PySpark из Hive, используя sqlContext.sql("SELECT * FROM <table>"). <table> содержит очищенные данные и инженерные функции и не меняется между успешными и неудачными конвейерами. Мне интересно, различается ли порядок данных при каждом вызове SELECT * (то есть каждый раз, когда я запускаю новое ядро), и некоторые порядки данных вызывают ошибку - хотя я не смог распознать шаблон. Я нашел сообщение в блоге , в котором содержится VectorAssembler для вывода метаданных из первой строки набора данных. Но если мой ввод не содержит нулей, порядок не должен иметь значения, не так ли? Единственное, о чем я могу подумать, это первая строка, содержащая «пустой» вектор из поведения OneHotEncoder dropLast=True (например, (2, [], [])), но разве VectorAssembler не сможет справиться с этим?

Вот мой код. Я заранее прошу прощения за невозможность предоставить данные, которые воссоздают ошибку или приводят к успешному конвейеру, но мой набор данных большой и проприетарный. Я могу попытаться предоставить некоторые фрагменты, если это поможет в устранении неполадок. Спасибо.

from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

# load data from Hive 
train_df = sqlContext.sql('select * from <hive table>')

# impute missing data
asis_cols = [asis_col1, asis_col2, ...]
ohe_cols = ['fac_usg', 'src_type']
train_df_imp = (train_df.na.fill(0, subset=asis_cols)
                        .na.fill('blank', subset=ohe_cols))

# create pipeline
indexers = [StringIndexer(inputCol=c, outputCol=c+"_idxd") for c in ohe_cols]
encoders = [OneHotEncoder(inputCol=c+"_idxd", outputCol=c+"_enc") for c in ohe_cols]

assembly_list = asis_cols + [c+"_enc" for c in ohe_cols]
assembler = VectorAssembler(inputCols=assembly_list, outputCol='features')

labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')
alg = DecisionTreeClassifier(labelCol='indexedLabel', minInstancesPerNode=3)

pipe = Pipeline(stages=indexers + encoders + [assembler] + [labelIndexer] + [alg])

# fit model
model = pipe.fit(train_df_imp)

Редактировать : Вот схема train_df. Все asis_col имеют тип double.

root
 |-- rec_id: integer (nullable = true)
 |-- rec_date: date (nullable = true)
 |-- amount_usd: double (nullable = true)
 |-- src_type: string (nullable = true)
 |-- fac_usg: string (nullable = true)
 |-- description: string (nullable = true)
 |-- asis_col1: double (nullable = true)
 |-- asis_col2: double (nullable = true)
 ...

Пример шагов, которые я предпринял, чтобы подтвердить, что na.fill работал:

from pyspark.sql.functions import count, isnan, when, col

train_df_imp.select([count(when(isnan(c), c)).alias(c) for c in asis_cols]).toPandas().loc[0].sum()
>>> 0

Стековая трассировка сообщения об ошибке:

Py4JJavaError: An error occurred while calling o1468.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 86.0 failed 4 times, most recent failure: Lost task 0.3 in stage 86.0 (TID 4907, poldcdhdn010.dev.intranet, executor 4): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<asis_col1:double,... 13 more fields>) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
    ... 29 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1353)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1326)
    at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:112)
    at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:105)
    at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:116)
    at org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:45)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:96)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<asis_col1:double,... 13 more fields>) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:160)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:143)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
    at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:143)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
    at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
    ... 29 more
...