Исключение: объекты должны иметь тип org. apache .spark.ml.linalg.VectorUDT - PullRequest
0 голосов
/ 28 января 2020

Я хочу запустить PCA с KNN в искре. У меня есть файл, который содержит идентификатор, функции.

> KNN.printSchema
root
|-- id: int (nullable = true)
|-- features: double (nullable = true)

код:

val dataset =  spark.read.parquet("/usr/local/spark/dataset/data/user")
val features = new VectorAssembler()
    .setInputCols(Array("id", "features" ))
    .setOutputCol("features")
val Array(train, test) = dataset
      .randomSplit(Array(0.7, 0.3), seed = 1234L)
      .map(_.cache())

//create PCA matrix to reduce feature dimensions
val pca = new PCA()
      .setInputCol("features")
      .setK(5)
      .setOutputCol("pcaFeatures")
val knn = new KNNClassifier()
      .setTopTreeSize(dataset.count().toInt / 5)
      .setFeaturesCol("pcaFeatures")
      .setPredictionCol("predicted")
      .setK(1)
val pipeline = new Pipeline()
      .setStages(Array(pca, knn))
      .fit(train)

Над блоком кода выдает это исключение

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually ArrayType(DoubleType,true).
    at scala.Predef$.require(Predef.scala:224)
    at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
    at org.apache.spark.ml.feature.PCAParams$class.validateAndTransformSchema(PCA.scala:54)
    at org.apache.spark.ml.feature.PCAModel.validateAndTransformSchema(PCA.scala:125)
    at org.apache.spark.ml.feature.PCAModel.transformSchema(PCA.scala:162)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
    at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:180)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
    at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:180)
    at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:70)
    at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:132)
    at KNN$.main(KNN.scala:63)
    at KNN.main(KNN.scala)

1 Ответ

1 голос
/ 31 января 2020

По сути, вы пытаетесь разделить набор данных на обучение и тестирование, собрать функции, запустить PCA, а затем классификатор, чтобы что-то предсказать. Общая логика c верна, но с вашим кодом есть несколько проблем.

  1. PCA в искре нуждается в собранных функциях. Вы создали один, но не используете его в коде.
  2. Вы дали имя features выводу ассемблера, и у вас уже есть столбец с таким именем. Поскольку вы не используете его, вы не увидите ошибку, но если бы вы были, вы получите это исключение:
java.lang.IllegalArgumentException: Output column features already exists.
При выполнении классификации необходимо указать как минимум входные объекты с помощью setFeaturesCol и метку, которую вы пытаетесь изучить с помощью setLabelCol. Вы не указали метку, и по умолчанию она называется "label". У вас нет ни одного столбца, названного таким образом, следовательно, исключительная искра на вас.

Вот рабочий пример того, что вы пытаетесь сделать.

// a funky dataset with 3 features (`x1`, `x2`, `x`3) and a label `y`,
// the class we are trying to predict.
val dataset = spark.range(10)
    .select('id as "x1", rand() as "x2", ('id * 'id) as "x3")
    .withColumn("y", (('x2 * 3 + 'x1) cast "int").mod(2))
    .cache()

// splitting the dataset, that part was ok ;-)
val Array(train, test) = dataset
      .randomSplit(Array(0.7, 0.3), seed = 1234L)
      .map(_.cache())

// An assembler, the output name cannot be one of the inputs.
val assembler = new VectorAssembler()
    .setInputCols(Array("x1", "x2", "x3"))
    .setOutputCol("features")

// A pca, that part was ok as well
val pca = new PCA()
    .setInputCol("features")
    .setK(2)
    .setOutputCol("pcaFeatures")

// A LogisticRegression classifier. (KNN is not part of spark's standard API, but
// requires the same minimum information: features and label)
val classifier = new LogisticRegression()
    .setFeaturesCol("pcaFeatures")
    .setLabelCol("y")

// And the full pipeline
val pipeline = new Pipeline().setStages(Array(assembler, pca, classifier))
val model = pipeline.fit(train)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...