Apache Spark StringIndexer применяет несуществующие метки (исключение невидимой метки) - PullRequest
0 голосов
/ 20 февраля 2019

Я пытаюсь выполнить рандомизацию леса с помощью PySpark 2.3.0.Мой набор данных содержит три столбца, которые являются строками, поэтому я использую StringIndexer для преобразования их в числа.К сожалению, во время оценки индексатор неожиданно находит метки, которых нет нигде в наборе данных.

Вот выдержка из моего набора данных (последний столбец - метка 0/1):

Year,Month,DayofMonth,DayOfWeek,DepTime,UniqueCarrier,Origin,Dest,Distance,DepDelay15Min
2004,1,12,1,623,UA,ORD,CLT,599,0
2004,1,13,2,621,UA,ORD,CLT,599,0
2004,1,14,3,633,UA,ORD,CLT,599,0

Вот мой сценарий:

CSV_PATH = "data/mllib/2004_10000_small.csv"
APP_NAME = "Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 13579
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 10
RF_MAX_DEPTH = 30
RF_MAX_BINS = 2048
LABEL = "DepDelay15Min"
CATEGORICAL_FEATURES = ["UniqueCarrier", "Origin", "Dest"]

from pyspark import SparkContext
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import SparkSession
from time import *

# Creates Spark Session
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# Reads in CSV file as DataFrame
# header: The first line of files are used to name columns and are not included in data. All types are assumed to be string.
# inferSchema: Automatically infer column types. It requires one extra pass over the data.
df = spark.read.options(header = "true", inferschema = "true").csv(CSV_PATH)

# Transforms all strings into indexed numbers
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in CATEGORICAL_FEATURES]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

# Removes old string columns
df = df.drop(*CATEGORICAL_FEATURES)

# Moves the label to the last column
df = StringIndexer(inputCol=LABEL, outputCol=LABEL+"_label").fit(df).transform(df)
df = df.drop(LABEL)

# Converts the DataFrame into a LabeledPoint Dataset with the last column being the label and the rest the features.
transformed_df = df.rdd.map(lambda row: LabeledPoint(row[-1], Vectors.dense(row[0:-1])))

# Splits the dataset into a training and testing set according to the defined ratio using the defined random seed.
splits = [TRAINING_DATA_RATIO, 1.0 - TRAINING_DATA_RATIO]
training_data, test_data = transformed_df.randomSplit(splits, RANDOM_SEED)

print("Number of training set rows: %d" % training_data.count())
print("Number of test set rows: %d" % test_data.count())

# Run algorithm and measure runtime
start_time = time()

model = RandomForest.trainClassifier(training_data, numClasses=2, categoricalFeaturesInfo={}, numTrees=RF_NUM_TREES, featureSubsetStrategy="auto", impurity="gini", maxDepth=RF_MAX_DEPTH, maxBins=RF_MAX_BINS, seed=RANDOM_SEED)

end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

# Make predictions and compute accuracy
predictions = model.predict(test_data.map(lambda x: x.features))
labels_and_predictions = test_data.map(lambda x: x.label).zip(predictions)
acc = labels_and_predictions.filter(lambda x: x[0] == x[1]).count() / float(test_data.count())
print("Model accuracy: %.3f%%" % (acc * 100))

При выполнении label_and_predictions.filter () в самом конце я получаю следующее сообщение об ошибке:

Caused by: org.apache.spark.SparkException: Unseen label: OR.  To handle unseen labels, set Param handleInvalid to keep.
        at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$9.apply(StringIndexer.scala:260)

Однако, метка "ИЛИmsgstr "не существует нигде в наборе данных, только" ORD ".Я пробовал разные наборы данных, и оказалось, что Spark продолжает обрезать последнюю букву строки «Происхождение».Я не имею ни малейшего представления, какая часть сценария может быть ответственна за это.Есть идеи, как мне продолжить расследование?Спасибо и заранее!

1 Ответ

0 голосов
/ 21 февраля 2019

Как отметил Эрик, я использовал устаревшую MLLib вместо библиотеки ML.Я до сих пор не понимаю, почему оригинальный скрипт не работал, но после его переноса на ML это работает.Вот новое решение, вдохновленное этим примером: https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier

CSV_PATH = "data/mllib/2004_10000_small.csv"
APP_NAME = "Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 13579
TRAININGDATA_RATIO = 0.7
VI_MAX_CATEGORIES = 4
RF_NUM_TREES = 10
RF_MAX_DEPTH = 30
RF_MAX_BINS = 2048
LABEL = "DepDelay15Min"
CATEGORICAL_FEATURES = ["UniqueCarrier", "Origin", "Dest"]

from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.sql import SparkSession
from time import *

# Creates Spark Session
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# Reads in CSV file as DataFrame
# header: The first line of files are used to name columns and are not included in data. All types are assumed to be string.
# inferSchema: Automatically infer column types. It requires one extra pass over the data.
data = spark.read.options(header = "true", inferschema = "true").csv(CSV_PATH)

# Transforms all string features into indexed numbers
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(data) for column in CATEGORICAL_FEATURES]
pipeline = Pipeline(stages=indexers)
data = pipeline.fit(data).transform(data)

# Removes old string columns
data = data.drop(*CATEGORICAL_FEATURES)

# Indexes the label and moves it to the last column
data = StringIndexer(inputCol=LABEL, outputCol="label").fit(data).transform(data)
data = data.drop(LABEL)

# Assembles all feature columns and moves them to the last column
assembler = VectorAssembler(inputCols=data.columns[0:-1], outputCol="features")
data = assembler.transform(data)

# Remove all columns but label and features
data = data.drop(*data.columns[0:-2])

# Splits the dataset into a training and testing set according to the defined ratio using the defined random seed.
splits = [TRAININGDATA_RATIO, 1.0 - TRAININGDATA_RATIO]
trainingData, testData = data.randomSplit(splits, RANDOM_SEED)

print("Number of training set rows: %d" % trainingData.count())
print("Number of test set rows: %d" % testData.count())

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > VI_MAX_CATEGORIES distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedataeatures", maxCategories=VI_MAX_CATEGORIES).fit(data)

# Train a RandomForest model.
randomForest = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedataeatures", numTrees=RF_NUM_TREES, maxBins=RF_MAX_BINS)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, randomForest, labelConverter])

# Train model.  This also runs the indexers. Measures the execution time as well.
start_time = time()
model = pipeline.fit(trainingData)
end_time = time()

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...