CrossValidation / TrainValidationSplit с несколькими конвейерами в PySpark - PullRequest
0 голосов
/ 11 октября 2018

Я пытаюсь оценить несколько конвейеров в PySpark.Я могу сделать это в отдельном CV / TVS для каждого, но я хотел бы сделать это только в одном, чтобы он дал мне лучшую модель напрямую, и я не могу выяснить, как заставить это работать.

lr_assembler и assembler являются 2 экземплярами VectorAsembler (выбор различных функций).

pca, lr, rf и gbt являются экземплярами PCA, LinearRegression, RandomForestRegressor и GBTRegressor.

Определение конвейеров:

pipeline = Pipeline()

lr_stages = [lr_assembler, pca, lr]
rf_stages = [assembler, rf]
gbt_stages = [assembler, gbt]

lr_pipeline = Pipeline(stages=lr_stages)
rf_pipeline = Pipeline(stages=rf_stages)
gbt_pipeline = Pipeline(stages=gbt_stages)

Определение paramMaps:

lr_grid = ParamGridBuilder().baseOn({pipeline.stages:lr_stages})\
                            .addGrid(pca.k, [2, 5, 7])\
                            .build()

rf_grid = ParamGridBuilder().baseOn({pipeline.stages:rf_stages})\
                            .addGrid(rf.maxDepth, [5, 10])\
                            .addGrid(rf.featureSubsetStrategy, ['3', '6'])\
                            .build()

gbt_grid = ParamGridBuilder().baseOn({pipeline.stages:gbt_stages})\
                             .addGrid(gbt.maxDepth, [5, 10])\
                             .addGrid(gbt.maxIter, [50, 100])\
                             .build()

grid = lr_grid + rf_grid + gbt_grid

Определение TrainValidationSplit:

tvs = TrainValidationSplit(estimator=pipeline, estimatorParamMaps=grid, evaluator=rmse_evaluator, trainRatio=0.8, parallelism=3, seed=7)

Обучение модели:

model = tvs.fit(train_val)

И после выполнения этой последней строки я получаю сообщение об ошибке (не уверен, стоит ли мне публиковать здесь все):

KeyError: Param(parent='Pipeline_40f78ef0cee04a4ebc61', name='stages', doc='a list of pipeline stages')

Спасибо за ваше время.

1 Ответ

0 голосов
/ 05 марта 2019

У меня была та же проблема, которую я решил, инициализировав этапы конвейера.

pipeline = Pipeline(stages=[])  # Must initialize with empty list!

Вот хороший пример такого подхода: https://github.com/dsharpc/dsharpc.github.io/blob/master/SparkMLFlights/README.md

...