PySpark Определите пользовательский трансформатор, который принимает в качестве входных данных подогнанную модель предыдущего шага - PullRequest
0 голосов
/ 09 ноября 2018

Я сейчас использую следующие данные:

https://github.com/apache/spark/blob/master/data/mllib/iris_libsvm.txt

Я хочу обновить модель OneVsRest для прогнозирования классов (многоуровневое прогнозирование) с вероятностями, превышающими определенный порог, вместо прогнозирования класса с максимальной вероятностью.

Пока мне удалось это реализовать, но не в конвейере.

from pyspark.ml.classification import RandomForestClassifier, OneVsRest
from pyspark.mllib.evaluation import MultilabelMetrics

from pyspark.sql.types import DoubleType, IntegerType, ArrayType
from pyspark.sql.functions import lit, udf, row_number, col, array
from pyspark.sql.window import Window

# load data file.
inputData = spark.read.format("libsvm") \
    .load("iris_libsvm.txt")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])

# instantiate the base classifier.
rf = RandomForestClassifier()

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=rf)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data. (default scoring)
predictions = ovrModel.transform(test)

Как только я подгоняю общую модель, я могу получить доступ ко всем моделям для каждого класса, используя ovrModel.models и выполнить цикл по этим моделям, чтобы вычислить вероятность для каждого класса для каждого наблюдения в моем наборе тестов, а затем выбрать классы, у которых pr> порог.

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

# Looping over each model over the test set and calculating the probability of each class. 
# pr0 is the column that represents the probability of class 0 
# pr1 is the column that represents the probability of class 1 
# ...


for i in range(len(ovrModel.models)):
    pr = ovrModel.models[i].transform(test)
    w = Window().orderBy("features")
    pr = pr.withColumn("row_number", row_number().over(w))
    predictions = predictions.withColumn("row_number", row_number().over(w))

    pr = pr.select("row_number", ith("probability", lit(1)))
    pr = pr.select("row_number", col('ith_(probability, 1)').alias("pr" + str(i)))

    predictions = predictions.join(pr, "row_number")
    predictions = predictions.drop("row_number")

cols = predictions.columns
cols = [elt for elt in cols if elt.startswith("pr")]
cols.remove("prediction")

# Once probabilities are calculated, I use a 0.5 threshold to determine which class(es) to predict. 
# I store result in a column preds of type ArrayType(DoubleType())

threshold = 0.5

for c in cols:
    predictions = predictions.withColumn(c, (col(c) >= threshold).cast('int'))

def index_(v):
    l = []
    for i, j in enumerate(v):
        if j == 1:
            l.append(i)
    return l

index = udf(index_, ArrayType(IntegerType()))

predictions = predictions.withColumn('preds', index("pr"))
predictions = predictions.withColumn("preds", predictions.preds.cast("array<double>"))

# Transform the type of label column to ArrayType(DoubleType()) as well
predictions = predictions.withColumn("label", array("label"))

Конечный фрейм данных должен выглядеть примерно так:

enter image description here

Как только это будет сделано, я могу использовать метрики с несколькими метками, предопределенные Spark, для расчета таких метрик, как точность.

scoresAndLabels = predictions.select("label", "preds").rdd
metrics = MultilabelMetrics(scoresAndLabels)

# Summary stats
print("Recall = %s" % metrics.recall())
print("Precision = %s" % metrics.precision())
print("F1 measure = %s" % metrics.f1Measure())
print("Accuracy = %s" % metrics.accuracy)

Мой вопрос: Как я могу реализовать все шаги сразу после ovrModel = ovr.fit (train) в собственном преобразователе, чтобы я мог использовать это позже в конвейере для перекрестной проверки.

Заранее спасибо

...