Прогнозирование Sparkml с использованием новых данных дает исключение ArrayIndexOutOfBoundsException - PullRequest
0 голосов
/ 24 апреля 2020

Я обучил и сохранил модель, и теперь я пытаюсь использовать эту модель для прогнозирования с использованием новых данных. Я перепробовал много вещей, и всегда ошибки совпадают с представленными ниже.

Просто, чтобы вы знали, что я пытался создать вручную фрейм данных, а также я пытался вручную создать объекты Векторы и не работает.

Я использую тот же конвейер, который использовался и при создании модели. Я не мог себе представить, что эта простая вещь была бы такой болезненной, если бы использовалась искра.

Версия Spark: 2.4.4 , и я использую Python в этом случае .

Помогите, пожалуйста!

Загрузка модели и торможение:

gbt_model_load  = GBTRegressionModel.load('gbt_model/')
gbt_model_pred  = gbt_model_load.transform(dfFeat1)
gbt_model_pred.printSchema()
gbt_model_pred.select('budgetvalue','prediction').show()

Ошибка:

Py4JJavaError: An error occurred while calling o8585.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 589.0 failed 1 times, most recent failure: Lost task 0.0 in stage 589.0 (TID 2134, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => double)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1226
    at org.apache.spark.ml.linalg.DenseVector.apply(Vectors.scala:462)

Код детали:

data_df = spark.read.csv('abt/data/*',header=True, inferSchema=True)

def featuresCreation(df):
    continuous_cols  = ['a','b','c']
    categorical_cols = ['d','e']

    categorical_indexers = [
        StringIndexer(inputCol=column, outputCol="{0}_indexed".format(column), handleInvalid='keep')
        for column in categorical_cols
]

    categorical_features = [
        OneHotEncoder(
            inputCol=indexer.getOutputCol(),
            outputCol="{0}_encoded".format(indexer.getOutputCol()))
         for indexer in categorical_indexers
    ]

    continuous_feature = [VectorAssembler(
        inputCols=[column for column in continuous_cols],
        outputCol="continuous_features"
   )]

    continuous_feature_standard = [StandardScaler(inputCol='continuous_features', outputCol='scaled_continuous_features')]

    all_features = continuous_feature_standard + categorical_features

    universal_assembler = VectorAssembler(
    inputCols=[feature.getOutputCol() for feature in all_features],
    outputCol="features"
    )

    estimator = categorical_indexers + \
                categorical_features + \
                continuous_feature + \
                continuous_feature_standard + \
                [universal_assembler]

    universal_pipeline = Pipeline(stages=estimator)
    #universal_pipeline.save('universal_pipeline/')
    data_features_df   = universal_pipeline.fit(df).transform(df)

    return data_features_df

Применение конвейера:

data_features_df = featuresCreation(data_df)

Обучение модели:

gbt           = GBTRegressor(featuresCol='features',labelCol='budgetvalue', maxDepth=3, maxIter=40)
gbt_model     = gbt.fit(train_df)
gbt_pred      = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='rmse')
gbt_rmse      = gbt_evaluator.evaluate(gbt_pred)
...