Spark ML вставляет / вставляет пользовательский OneHotEncoder в конвейер - PullRequest
2 голосов
/ 27 февраля 2020

Скажем, у меня есть несколько объектов / столбцов в кадре данных, к которым я применяю обычный OneHotEncoder, и один (пусть, n-й) столбец, к которому мне нужно применить свой пользовательский OneHotEncoder. Затем мне нужно использовать VectorAssembler, чтобы собрать эти функции, и поместить в конвейер, наконец подгоняя мои trainData и получая прогнозы из моих testData, например:

val sIndexer1 = new StringIndexer().setInputCol("my_feature1").setOutputCol("indexed_feature1")
// ... let, n-1 such sIndexers for n-1 features
val featureEncoder = new OneHotEncoderEstimator().setInputCols(Array(sIndexer1.getOutputCol), ...).
      setOutputCols(Array("encoded_feature1", ... ))

// **need to insert output from my custom OneHotEncoder function (please see below)**
// (which takes the n-th feature as input) in a way that matches the VectorAssembler below

val vectorAssembler = new VectorAssembler().setInputCols(featureEncoder.getOutputCols + ???).
      setOutputCol("assembled_features")

...

val pipeline = new Pipeline().setStages(Array(sIndexer1, ...,featureEncoder, vectorAssembler, myClassifier))
val model = pipeline.fit(trainData)
val predictions = model.transform(testData)

Как я могу изменить построение vectorAssembler, чтобы он мог принимать выходные данные из пользовательского OneHotEncoder? Проблема в том, что мой желаемый oheEncodingTopN () не может / не должен ссылаться на «фактический» фрейм данных , поскольку он будет частью конвейер (для применения к trainData / testData).

Примечание:

Я проверил, что пользовательский OneHotEncoder (см. ссылка ) работает, как и ожидалось, отдельно, например, на trainData. По сути, oheEncodingTopN применяет OneHotEncoding к входному столбцу, но только для верхних N частых значений (например, N = 50) и помещает все остальные нечастые значения в фиктивный столбец (скажем, «по умолчанию»), например:

val oheEncoded = oheEncodingTopN(df, "my_featureN", 50)

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.Column


def flip(col: Column): Column = when(col === 1, lit(0)).otherwise(lit(1))

def oheEncodingTopN(df: DataFrame, colName: String, n: Int): DataFrame = {
  df.createOrReplaceTempView("data")
  val topNDF = spark.sql(s"select $colName, count(*) as count from data group by $colName order by count desc limit $n")

  val pivotTopNDF = topNDF.
    groupBy(colName).
    pivot(colName).
    count().
    withColumn("default", lit(1))

  val joinedTopNDF = df.join(pivotTopNDF, Seq(colName), "left").drop(colName)

  val oheEncodedDF = joinedTopNDF.
    na.fill(0, joinedTopNDF.columns).
    withColumn("default", flip(col("default")))

   oheEncodedDF

}

1 Ответ

2 голосов
/ 27 февраля 2020

Я думаю, что самым чистым способом было бы создать свой собственный класс, расширяющий Spark ML Transformer, чтобы вы могли играть с ним так же, как и с любым другим преобразователем (например, OneHotEncoder). Ваш класс будет выглядеть следующим образом:

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.Param
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Column}

class OHEncodingTopN(n :Int, override val uid: String) extends Transformer {
  final val inputCol= new Param[String](this, "inputCol", "The input column")
  final val outputCol = new Param[String](this, "outputCol", "The output column")

 ; def setInputCol(value: String): this.type = set(inputCol, value)

  def setOutputCol(value: String): this.type = set(outputCol, value)

  def this(n :Int) = this(n, Identifiable.randomUID("OHEncodingTopN"))

  def copy(extra: ParamMap): OHEncodingTopN = {
    defaultCopy(extra)
  }

  override def transformSchema(schema: StructType): StructType = {
    // Check that the input type is what you want if needed 
    //     val idx = schema.fieldIndex($(inputCol))
    //     val field = schema.fields(idx)
    //     if (field.dataType != StringType) {
    //       throw new Exception(s"Input type ${field.dataType} did not match input type StringType")
    //     }
    // Add the return field
    schema.add(StructField($(outputCol), IntegerType, false))
  }
  def flip(col: Column): Column = when(col === 1, lit(0)).otherwise(lit(1))

  def transform(df: Dataset[_]): DataFrame = {
      df.createOrReplaceTempView("data")
      val colName = $(inputCol)
      val topNDF = df.sparkSession.sql(s"select $colName, count(*) as count from data group by $colName order by count desc limit $n")

      val pivotTopNDF = topNDF.
        groupBy(colName).
        pivot(colName).
        count().
        withColumn("default", lit(1))

      val joinedTopNDF = df.join(pivotTopNDF, Seq(colName), "left").drop(colName)

      val oheEncodedDF = joinedTopNDF.
        na.fill(0, joinedTopNDF.columns).
        withColumn("default", flip(col("default")))

       oheEncodedDF
  }
}

Теперь для объекта OHEncodingTopN вы сможете вызывать .getOuputCol, чтобы выполнить то, что вы хотите. Удачи.

EDIT: Ваш метод, который я только что скопировал, вставил в метод transform, должен быть немного изменен, чтобы вывести столбец типа Vector с именем, заданным в setOutputCol.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...