Я обучил и сохранил модель, и теперь я пытаюсь использовать эту модель для прогнозирования с использованием новых данных. Я перепробовал много вещей, и всегда ошибки совпадают с представленными ниже.
Просто, чтобы вы знали, что я пытался создать вручную фрейм данных, а также я пытался вручную создать объекты Векторы и не работает.
Я использую тот же конвейер, который использовался и при создании модели. Я не мог себе представить, что эта простая вещь была бы такой болезненной, если бы использовалась искра.
Версия Spark: 2.4.4 , и я использую Python в этом случае .
Помогите, пожалуйста!
Загрузка модели и торможение:
gbt_model_load = GBTRegressionModel.load('gbt_model/')
gbt_model_pred = gbt_model_load.transform(dfFeat1)
gbt_model_pred.printSchema()
gbt_model_pred.select('budgetvalue','prediction').show()
Ошибка:
Py4JJavaError: An error occurred while calling o8585.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 589.0 failed 1 times, most recent failure: Lost task 0.0 in stage 589.0 (TID 2134, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) => double)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1226
at org.apache.spark.ml.linalg.DenseVector.apply(Vectors.scala:462)
Код детали:
data_df = spark.read.csv('abt/data/*',header=True, inferSchema=True)
def featuresCreation(df):
continuous_cols = ['a','b','c']
categorical_cols = ['d','e']
categorical_indexers = [
StringIndexer(inputCol=column, outputCol="{0}_indexed".format(column), handleInvalid='keep')
for column in categorical_cols
]
categorical_features = [
OneHotEncoder(
inputCol=indexer.getOutputCol(),
outputCol="{0}_encoded".format(indexer.getOutputCol()))
for indexer in categorical_indexers
]
continuous_feature = [VectorAssembler(
inputCols=[column for column in continuous_cols],
outputCol="continuous_features"
)]
continuous_feature_standard = [StandardScaler(inputCol='continuous_features', outputCol='scaled_continuous_features')]
all_features = continuous_feature_standard + categorical_features
universal_assembler = VectorAssembler(
inputCols=[feature.getOutputCol() for feature in all_features],
outputCol="features"
)
estimator = categorical_indexers + \
categorical_features + \
continuous_feature + \
continuous_feature_standard + \
[universal_assembler]
universal_pipeline = Pipeline(stages=estimator)
#universal_pipeline.save('universal_pipeline/')
data_features_df = universal_pipeline.fit(df).transform(df)
return data_features_df
Применение конвейера:
data_features_df = featuresCreation(data_df)
Обучение модели:
gbt = GBTRegressor(featuresCol='features',labelCol='budgetvalue', maxDepth=3, maxIter=40)
gbt_model = gbt.fit(train_df)
gbt_pred = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol='budgetvalue', predictionCol='prediction', metricName='rmse')
gbt_rmse = gbt_evaluator.evaluate(gbt_pred)