Я сейчас использую следующие данные:
https://github.com/apache/spark/blob/master/data/mllib/iris_libsvm.txt
Я хочу обновить модель OneVsRest для прогнозирования классов (многоуровневое прогнозирование) с вероятностями, превышающими определенный порог, вместо прогнозирования класса с максимальной вероятностью.
Пока мне удалось это реализовать, но не в конвейере.
from pyspark.ml.classification import RandomForestClassifier, OneVsRest
from pyspark.mllib.evaluation import MultilabelMetrics
from pyspark.sql.types import DoubleType, IntegerType, ArrayType
from pyspark.sql.functions import lit, udf, row_number, col, array
from pyspark.sql.window import Window
# load data file.
inputData = spark.read.format("libsvm") \
.load("iris_libsvm.txt")
# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])
# instantiate the base classifier.
rf = RandomForestClassifier()
# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=rf)
# train the multiclass model.
ovrModel = ovr.fit(train)
# score the model on test data. (default scoring)
predictions = ovrModel.transform(test)
Как только я подгоняю общую модель, я могу получить доступ ко всем моделям для каждого класса, используя ovrModel.models и выполнить цикл по этим моделям, чтобы вычислить вероятность для каждого класса для каждого наблюдения в моем наборе тестов, а затем выбрать классы, у которых pr> порог.
def ith_(v, i):
try:
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
# Looping over each model over the test set and calculating the probability of each class.
# pr0 is the column that represents the probability of class 0
# pr1 is the column that represents the probability of class 1
# ...
for i in range(len(ovrModel.models)):
pr = ovrModel.models[i].transform(test)
w = Window().orderBy("features")
pr = pr.withColumn("row_number", row_number().over(w))
predictions = predictions.withColumn("row_number", row_number().over(w))
pr = pr.select("row_number", ith("probability", lit(1)))
pr = pr.select("row_number", col('ith_(probability, 1)').alias("pr" + str(i)))
predictions = predictions.join(pr, "row_number")
predictions = predictions.drop("row_number")
cols = predictions.columns
cols = [elt for elt in cols if elt.startswith("pr")]
cols.remove("prediction")
# Once probabilities are calculated, I use a 0.5 threshold to determine which class(es) to predict.
# I store result in a column preds of type ArrayType(DoubleType())
threshold = 0.5
for c in cols:
predictions = predictions.withColumn(c, (col(c) >= threshold).cast('int'))
def index_(v):
l = []
for i, j in enumerate(v):
if j == 1:
l.append(i)
return l
index = udf(index_, ArrayType(IntegerType()))
predictions = predictions.withColumn('preds', index("pr"))
predictions = predictions.withColumn("preds", predictions.preds.cast("array<double>"))
# Transform the type of label column to ArrayType(DoubleType()) as well
predictions = predictions.withColumn("label", array("label"))
Конечный фрейм данных должен выглядеть примерно так:
Как только это будет сделано, я могу использовать метрики с несколькими метками, предопределенные Spark, для расчета таких метрик, как точность.
scoresAndLabels = predictions.select("label", "preds").rdd
metrics = MultilabelMetrics(scoresAndLabels)
# Summary stats
print("Recall = %s" % metrics.recall())
print("Precision = %s" % metrics.precision())
print("F1 measure = %s" % metrics.f1Measure())
print("Accuracy = %s" % metrics.accuracy)
Мой вопрос:
Как я могу реализовать все шаги сразу после ovrModel = ovr.fit (train) в собственном преобразователе, чтобы я мог использовать это позже в конвейере для перекрестной проверки.
Заранее спасибо