Spark dataframe - передать переменную типа столбца в udf - PullRequest
0 голосов
/ 04 августа 2020

Я пытаюсь изменить столбец вектора «функции», удалив некоторые функции (сохранить в feature_idx_to_wipe). Псевдокод, как показано ниже, проблема в том, что udf не принимает Set. Мне интересно, как это исправить, и есть ли лучший подход.

//data
val feature_idx_to_wipe = Set(1, 2)
val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
dfA.show(false)
+---+-------------------------+
|id |features                 |
+---+-------------------------+
|0  |(6,[0,1,2],[1.0,1.0,1.0])|
|1  |(6,[2,3,4],[1.0,1.0,1.0])|
|2  |(6,[0,2,4],[1.0,1.0,1.0])|
+---+-------------------------+

//udf 
def wipe(v: NewSparseVector, idx2clean:Set[Int]) : NewSparseVector = {
    val lb:ListBuffer[(Int, Double)]=ListBuffer()
    v.foreachActive {
        case (i, v) =>
         if(!idx2clean.contains(i)){
          lb += ((i, v))
        }
    }

    NewVectors.sparse(v.size, lb.toSeq).toSparse 
}
val udf_wipe = udf((x: NewSparseVector, idx2clean:Set[Int]) => wipe(x, idx2clean))

//apply udf
dfA.withColumn("features_wiped", udf_wipe(col("features"), feature_idx_to_wipe))
// error: 
// scala> dfA.withColumn("nf", udf_wipe(col("features"), tc))
// <console>:98: error: type mismatch;
//  found   : scala.collection.immutable.Set[Int]
//  required: org.apache.spark.sql.Column
//        dfA.withColumn("nf", udf_wipe(col("features"), tc))

//target (a new column of vector added, with features at index 1,2 are removed)
dfA.select("id","features_wiped").show(false)
+---+-------------------------+
|id |features_wiped           |
+---+-------------------------+
|0  |(6,[0],[1.0])            |
|1  |(6,[3,4],[1.0,1.0])      |
|2  |(6,[0,4],[1.0,1.0])      |
+---+-------------------------+

Ответы [ 2 ]

0 голосов
/ 04 августа 2020

Другие альтернативы -

тестовые данные

//data
    val dfA = spark.createDataFrame(Seq(
      (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
      (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
      (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
    )).toDF("id", "features")
    dfA.show(false)

    /**
      * +---+-------------------------+
      * |id |features                 |
      * +---+-------------------------+
      * |0  |(6,[0,1,2],[1.0,1.0,1.0])|
      * |1  |(6,[2,3,4],[1.0,1.0,1.0])|
      * |2  |(6,[0,2,4],[1.0,1.0,1.0])|
      * +---+-------------------------+
      */

Альтернатива-1 Используйте lit как показано ниже -

// Alternative-1
    //udf
    val feature_idx_to_wipe = Array(1, 2)
    import org.apache.spark.ml.linalg.{SparseVector => NewSparseVector}
    def wipe(v: NewSparseVector, idx2clean:Seq[Int]) : NewSparseVector = {
      val lb:ListBuffer[(Int, Double)]=ListBuffer()
      v.foreachActive {
        case (i, v) =>
          if(!idx2clean.contains(i)){
            lb += ((i, v))
          }
      }

      Vectors.sparse(v.size, lb.toSeq).toSparse
    }
    val udf_wipe = udf((x: NewSparseVector, idx2clean:Seq[Int]) => wipe(x, idx2clean))

    //apply udf
    val newDF = dfA.withColumn("features_wiped", udf_wipe(col("features"), lit(feature_idx_to_wipe)))

    //target (a new column of vector added, with features at index 1,2 are removed)
    newDF.select("id","features_wiped").show(false)
    /**
      * +---+-------------------+
      * |id |features_wiped     |
      * +---+-------------------+
      * |0  |(6,[0],[1.0])      |
      * |1  |(6,[3,4],[1.0,1.0])|
      * |2  |(6,[0,4],[1.0,1.0])|
      * +---+-------------------+
      */

Альтернатива- 2 Используйте широковещательную переменную sparkcontext.broadcast, как показано ниже -

//    Alternative2
    //data
    val feature_idx_to_wipe1 = Set(1, 2)
    val broabcastSet = spark.sparkContext.broadcast(feature_idx_to_wipe1)

    //udf
    import org.apache.spark.ml.linalg.{SparseVector => NewSparseVector}
    def wipe1(v: NewSparseVector) : NewSparseVector = {
      val idx2clean = broabcastSet.value
      val lb:ListBuffer[(Int, Double)]=ListBuffer()
      v.foreachActive {
        case (i, v) =>
          if(!idx2clean.contains(i)){
            lb += ((i, v))
          }
      }

      Vectors.sparse(v.size, lb.toSeq).toSparse
    }
    val udf_wipe1 = udf((x: NewSparseVector) => wipe1(x))

    //apply udf
    val newDF1 = dfA.withColumn("features_wiped", udf_wipe1(col("features")))

    //target (a new column of vector added, with features at index 1,2 are removed)
    newDF1.select("id","features_wiped").show(false)

    /**
      * +---+-------------------+
      * |id |features_wiped     |
      * +---+-------------------+
      * |0  |(6,[0],[1.0])      |
      * |1  |(6,[3,4],[1.0,1.0])|
      * |2  |(6,[0,4],[1.0,1.0])|
      * +---+-------------------+
      */
0 голосов
/ 04 августа 2020

Очистка функции может быть преобразована в каррированную функцию , написав ее следующим образом:

def wipe(v: NewSparseVector)(idx2clean:Set[Int]) : NewSparseVector

Чтобы создать udf для соответствующей функции:

val udf_wipe = udf((x: NewSparseVector) => wipe(x)(feature_idx_to_wipe))

Наконец, применив udf к фрейму данных:

dfA.withColumn("features_wiped", udf_wipe(col("features")))
...