Я хочу построить конвейер PySpark для настройки гиперпараметров и выбора модели.Ниже приведен код, который я придумал, следуя некоторым веб-примерам / учебным пособиям.Поскольку я очень плохо знаком с PySpark (и Spark), я надеялся, что, возможно, кто-нибудь поможет мне пересмотреть код и, если возможно, дать мне несколько советов по оптимизации.
Импорт
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from spark_stratifier import StratifiedCrossValidator
Считать файл (просто заменив '.' На '_', так как PySpark запутывается, если имена столбцов содержат '.'):
credit = spark.read.options(header = True, sep=';',
inferSchema = True
).csv('..//bank-additional/'+\
'bank-additional-full.csv')
colnames = list(map(lambda x: x.replace(".", "_"), credit.columns))
credit = credit.toDF(*colnames)
идентифицирует числовые и категориальные столбцы
numerics = [x[0] for x in credit.dtypes if not x[1].startswith('string')]
categoricals = [x[0] for x in credit.dtypes if x[1].startswith('string')]
categoricals.remove('y')
Определить преобразования данных и классификатор, который может быть подключен позже в конвейер.MLlib требует, чтобы входные объекты были сформированы как векторы, поэтому VectorAssembler используется для формирования фрейма данных для классификатора:
y_indexer = StringIndexer(inputCol='y', outputCol='label')
x_indexers = [StringIndexer(inputCol=column,
outputCol=column+"_indx")
for column in list(set(categoricals)) ]
assembler = VectorAssembler(inputCols= numerics, outputCol="raw_features")
standardizer = StandardScaler(withMean=True, withStd=True,
inputCol='raw_features',
outputCol='std_features')
df_builder = VectorAssembler(inputCols=['std_features']+
[l+'_indx' for l in categoricals],
outputCol='features')
rf = RandomForestClassifier()
Конвейер предварительной обработки данных.Я кэшировал данные после всех этапов предварительной обработки, предполагая, что они будут использованы позже (?):
pipe_pre = Pipeline(stages=[y_indexer]+x_indexers+\
[assembler, standardizer, df_builder])
credit_pre = pipe_pre.fit(credit).transform(credit).\
select('label', 'features').cache()
Параметры сетки и конвейер анализа для передачи в StratifiedCrossValidator (это просто для того, чтобы код работал, параметры сетки иnfolds - фиктивный выбор):
grid_params = ParamGridBuilder()\
.addGrid(rf.maxDepth, [3,6]) \
.addGrid(rf.numTrees, [5,10]).build()
pipe_ana = Pipeline(stages=[rf])
crossval = StratifiedCrossValidator(
estimator=pipe_ana,
estimatorParamMaps=grid_params,
evaluator=BinaryClassificationEvaluator(),
numFolds=2)
model = crossval.fit(credit_pre)
results = model.transform(credit_pre)
predictionLabels = results.select("prediction", "label")
metrics = BinaryClassificationMetrics(predictionLabels.rdd)
metrics.areaUnderROC
На данный момент все выглядит разумно.Однако цель состоит в том, чтобы запустить Grid Search с использованием PySpark для параллелизации задач.В связи с этим, это разумный подход, какие-либо советы по оптимизации?
спасибо!