PySpark конвейер для выбора модели и настройки параметров - PullRequest
0 голосов
/ 18 февраля 2019

Я хочу построить конвейер PySpark для настройки гиперпараметров и выбора модели.Ниже приведен код, который я придумал, следуя некоторым веб-примерам / учебным пособиям.Поскольку я очень плохо знаком с PySpark (и Spark), я надеялся, что, возможно, кто-нибудь поможет мне пересмотреть код и, если возможно, дать мне несколько советов по оптимизации.

Импорт

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from spark_stratifier import StratifiedCrossValidator

Считать файл (просто заменив '.' На '_', так как PySpark запутывается, если имена столбцов содержат '.'):

credit = spark.read.options(header = True, sep=';',
                            inferSchema = True
                           ).csv('..//bank-additional/'+\
                                 'bank-additional-full.csv')
colnames = list(map(lambda x: x.replace(".", "_"), credit.columns))
credit = credit.toDF(*colnames)

идентифицирует числовые и категориальные столбцы

numerics = [x[0] for x in credit.dtypes if not x[1].startswith('string')]
categoricals = [x[0] for x in credit.dtypes if x[1].startswith('string')]
categoricals.remove('y')

Определить преобразования данных и классификатор, который может быть подключен позже в конвейер.MLlib требует, чтобы входные объекты были сформированы как векторы, поэтому VectorAssembler используется для формирования фрейма данных для классификатора:

y_indexer = StringIndexer(inputCol='y', outputCol='label') 
x_indexers = [StringIndexer(inputCol=column, 
                            outputCol=column+"_indx")
              for column in list(set(categoricals)) ]
assembler = VectorAssembler(inputCols= numerics, outputCol="raw_features")

standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='raw_features',
                              outputCol='std_features')
df_builder = VectorAssembler(inputCols=['std_features']+
                             [l+'_indx' for l in categoricals],
                             outputCol='features')
rf = RandomForestClassifier()

Конвейер предварительной обработки данных.Я кэшировал данные после всех этапов предварительной обработки, предполагая, что они будут использованы позже (?):

pipe_pre = Pipeline(stages=[y_indexer]+x_indexers+\
                    [assembler, standardizer, df_builder])
credit_pre = pipe_pre.fit(credit).transform(credit).\
                select('label', 'features').cache()

Параметры сетки и конвейер анализа для передачи в StratifiedCrossValidator (это просто для того, чтобы код работал, параметры сетки иnfolds - фиктивный выбор):

grid_params = ParamGridBuilder()\
                    .addGrid(rf.maxDepth, [3,6]) \
                    .addGrid(rf.numTrees, [5,10]).build()

pipe_ana = Pipeline(stages=[rf])

crossval = StratifiedCrossValidator(
    estimator=pipe_ana,
    estimatorParamMaps=grid_params,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=2)

model = crossval.fit(credit_pre)
results = model.transform(credit_pre)
predictionLabels = results.select("prediction", "label")
metrics = BinaryClassificationMetrics(predictionLabels.rdd)
metrics.areaUnderROC

На данный момент все выглядит разумно.Однако цель состоит в том, чтобы запустить Grid Search с использованием PySpark для параллелизации задач.В связи с этим, это разумный подход, какие-либо советы по оптимизации?

спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...