Мне необходимо использовать Spark 2.1.1 и иметь простой вариант использования ML, в котором я подгоняю регрессию logisti c для выполнения классификации на основе как непрерывных, так и категориальных переменных.
Я автоматически определяю категориальные переменные и индексировать их в конвейере ML. Однако когда я пытаюсь применить горячее кодирование к каждой из моих переменных (значение oneHotEncodersStages в приведенном ниже коде), это приводит к ошибке при создании конвейера:
Ошибка: (48, 118) несоответствие типов; Найдено: Array [java .io.Serializable] обязательно: Array [_ <: org. apache .spark.ml.PipelineStage] Примечание: java .io.Serializable>: org. apache .spark. ml.PipelineStage, но класс Array инвариантен в типе T. Вы можете wi sh, чтобы исследовать подстановочный тип, такой как _ >: org.apache.spark.ml.PipelineStage
. (SLS 3.2.10)
val pipe = new Pipeline (). SetStages (stringIndexerStages: + oneHotEncodersStages: + indexer: + ассемблер: + lr: + indexToLabel)
Я не понимаю, как чтобы решить эту ошибку ... какие-либо советы? Ниже приведен минимальный рабочий пример
import spark.implicits._
val df = Seq(
("automatic","Honda",200,"Cheap"),
("semi-automatic","Ford",240,"Expensive")
).toDF("cat_type","cat_brand","Speed","label")
def onlyFeatureCols(c: String): Boolean = !(c matches "id|label") // Function to select only feature columns (omit id and label)
def isCateg(c: String): Boolean = c.startsWith("cat")
def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c
def isIdx(c: String): Boolean = c.startsWith("idx")
def oneHotNewCol(c: String): String = if (isIdx(c)) s"vec_${c}" else c
val featuresNames = df.columns
.filter(onlyFeatureCols)
.map(categNewCol)
val stringIndexerStages = df.columns.filter(isCateg)
.map(c => new StringIndexer()
.setInputCol(c)
.setOutputCol(categNewCol(c))
.fit(df.select(c))
)
val oneHotEncodersStages = df.columns.filter(isIdx)
.map(c => new OneHotEncoder()
.setInputCol(c)
.setOutputCol(oneHotNewCol(c)))
val indexer = new StringIndexer().setInputCol("label").setOutputCol("labels").fit(df)
val indexToLabel = new IndexToString().setInputCol("prediction").setOutputCol("predicted_label").setLabels(indexer.labels)
val assembler = new VectorAssembler().setInputCols(featuresNames).setOutputCol("features")
val lr = new LogisticRegression().setFeaturesCol("features").setLabelCol("labels")
val pipeline = new Pipeline().setStages(stringIndexerStages :+ oneHotEncodersStages ++ indexer :+ assembler :+ lr :+ indexToLabel)