scala массив фильтров искровой UDF структуры - PullRequest
2 голосов
/ 31 января 2020

У меня есть фрейм данных со схемой

root
 |-- x: Long (nullable = false)
 |-- y: Long (nullable = false)
 |-- features: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- score: double (nullable = true)

Например, у меня есть данные

+--------------------+--------------------+------------------------------------------+
|                x   |              y     |       features                           |
+--------------------+--------------------+------------------------------------------+
|10                  |          9         |[["f1", 5.9], ["ft2", 6.0], ["ft3", 10.9]]|
|11                  |          0         |[["f4", 0.9], ["ft1", 4.0], ["ft2", 0.9] ]|
|20                  |          9         |[["f5", 5.9], ["ft2", 6.4], ["ft3", 1.9] ]|
|18                  |          8         |[["f1", 5.9], ["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+------------------------------------------+

Я бы хотел отфильтровать объекты с определенным префиксом, скажем, "ft", так что в конечном итоге я хочу получить результат:

+--------------------+--------------------+-----------------------------+
|                x   |              y     |       features              |
+--------------------+--------------------+-----------------------------+
|10                  |          9         |[["ft2", 6.0], ["ft3", 10.9]]|
|11                  |          0         |[["ft1", 4.0], ["ft2", 0.9] ]|
|20                  |          9         |[["ft2", 6.4], ["ft3", 1.9] ]|
|18                  |          8         |[["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+-----------------------------+

Я не использую Spark 2.4+, поэтому я не могу использовать приведенное здесь решение: Массив структур Spark (Scala) без разнесения

Я пытался использовать UDF, но все равно не работает. Вот мои попытки. Я определяю UDF:

def filterFeature: UserDefinedFunction = 
udf((features: Seq[Row]) =>
    features.filter{
        x.getString(0).startsWith("ft")
    }
)

Но если я применяю этот UDF

df.withColumn("filtered", filterFeature($"features"))

, я получаю ошибку Schema for type org.apache.spark.sql.Row is not supported. Я обнаружил, что не могу вернуть Row из UDF. Затем я попытался

def filterFeature: UserDefinedFunction = 
udf((features: Seq[Row]) =>
    features.filter{
        x.getString(0).startsWith("ft")
    }, (StringType, DoubleType)
)

Затем я получил ошибку:

 error: type mismatch;
 found   : (org.apache.spark.sql.types.StringType.type, org.apache.spark.sql.types.DoubleType.type)
 required: org.apache.spark.sql.types.DataType
              }, (StringType, DoubleType)
                 ^

Я также попробовал класс случая, как было предложено некоторыми ответами:

case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction = 
udf((features: Seq[Row]) =>
    features.filter{
        x.getString(0).startsWith("ft")
    }, FilteredFeature
)

Но я получил:

 error: type mismatch;
 found   : FilteredFeature.type
 required: org.apache.spark.sql.types.DataType
              }, FilteredFeature
                 ^

Я пытался:

case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction = 
udf((features: Seq[Row]) =>
    features.filter{
        x.getString(0).startsWith("ft")
    }, Seq[FilteredFeature]
)

Я получил:

<console>:192: error: missing argument list for method apply in class GenericCompanion
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `apply _` or `apply(_)` instead of `apply`.
              }, Seq[FilteredFeature]
                    ^

Я пытался:

case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction = 
udf((features: Seq[Row]) =>
    features.filter{
        x.getString(0).startsWith("ft")
    }, Seq[FilteredFeature](_)
)

Я получил:

<console>:201: error: type mismatch;
 found   : Seq[FilteredFeature]
 required: FilteredFeature
              }, Seq[FilteredFeature](_)
                          ^

Что мне делать в этом случае?

Ответы [ 3 ]

2 голосов
/ 31 января 2020

У вас есть два варианта:

a) предоставить схему для UDF, это позволяет вам вернуть Seq[Row]

b) преобразовать Seq[Row] в Seq из Tuple2 или класс case, тогда вам не нужно предоставлять схему (но имена структурных полей теряются, если вы используете Tuples!)

Я бы предпочел вариант a) для вашего случая (хорошо работает для структур с много полей):

val schema = df.schema("features").dataType

val filterFeature = udf((features:Seq[Row]) => features.filter(_.getAs[String]("name").startsWith("ft")),schema)
0 голосов
/ 31 января 2020

Если вы не используете Spark 2.4, это должно работать в вашем случае

case class FilteredFeature(featureName: String, featureScore: Double)

import org.apache.spark.sql.functions._  
def filterFeature: UserDefinedFunction = udf((feature: Seq[Row]) => {
  feature.filter(x => {
    x.getString(0).startsWith("ft")
  }).map(r => FilteredFeature(r.getString(0), r.getDouble(1)))
})

df.select($"x", $"y", filterFeature($"feature") as "filter").show(false)

Вывод:

+---+---+-----------------------+
|x  |y  |filter                 |
+---+---+-----------------------+
|10 |9  |[[ft2,6.0], [ft3,10.9]]|
|11 |0  |[[ft1,4.0], [ft2,0.9]] |
|20 |9  |[[ft2,6.4], [ft3,1.9]] |
|18 |8  |[[ft4,8.1], [ft2,18.9]]|
+---+---+-----------------------+
0 голосов
/ 31 января 2020

Попробуйте это:

def filterFeature: UserDefinedFunction =
    udf((features: Row) => {
      features.getAs[Array[Array[Any]]]("features").filter(in => in(0).asInstanceOf[String].startsWith("ft"))
 })
...