По сути, вы пытаетесь разделить набор данных на обучение и тестирование, собрать функции, запустить PCA, а затем классификатор, чтобы что-то предсказать. Общая логика c верна, но с вашим кодом есть несколько проблем.
- PCA в искре нуждается в собранных функциях. Вы создали один, но не используете его в коде.
- Вы дали имя
features
выводу ассемблера, и у вас уже есть столбец с таким именем. Поскольку вы не используете его, вы не увидите ошибку, но если бы вы были, вы получите это исключение:
java.lang.IllegalArgumentException: Output column features already exists.
При выполнении классификации необходимо указать как минимум входные объекты с помощью
setFeaturesCol
и метку, которую вы пытаетесь изучить с помощью
setLabelCol
. Вы не указали метку, и по умолчанию она называется
"label"
. У вас нет ни одного столбца, названного таким образом, следовательно, исключительная искра на вас.
Вот рабочий пример того, что вы пытаетесь сделать.
// a funky dataset with 3 features (`x1`, `x2`, `x`3) and a label `y`,
// the class we are trying to predict.
val dataset = spark.range(10)
.select('id as "x1", rand() as "x2", ('id * 'id) as "x3")
.withColumn("y", (('x2 * 3 + 'x1) cast "int").mod(2))
.cache()
// splitting the dataset, that part was ok ;-)
val Array(train, test) = dataset
.randomSplit(Array(0.7, 0.3), seed = 1234L)
.map(_.cache())
// An assembler, the output name cannot be one of the inputs.
val assembler = new VectorAssembler()
.setInputCols(Array("x1", "x2", "x3"))
.setOutputCol("features")
// A pca, that part was ok as well
val pca = new PCA()
.setInputCol("features")
.setK(2)
.setOutputCol("pcaFeatures")
// A LogisticRegression classifier. (KNN is not part of spark's standard API, but
// requires the same minimum information: features and label)
val classifier = new LogisticRegression()
.setFeaturesCol("pcaFeatures")
.setLabelCol("y")
// And the full pipeline
val pipeline = new Pipeline().setStages(Array(assembler, pca, classifier))
val model = pipeline.fit(train)