Spark: динамический фильтр для набора данных в Scala - PullRequest
0 голосов
/ 21 февраля 2019

У меня есть набор данных (ds), который выглядит как

scala> ds.show()
+----+---+-----+----+-----+--------------+
|name|age|field|optr|value|          rule|
+----+---+-----+----+-----+--------------+
|   a| 75|  age|   <|   18|         Minor|
|   b| 10|  age|   <|   18|         Minor|
|   c| 30|  age|   <|   18|         Minor|
|   a| 75|  age|  >=|   18|         Major|
|   b| 10|  age|  >=|   18|         Major|
|   c| 30|  age|  >=|   18|         Major|
|   a| 75|  age|   >|   60|Senior Citizen|
|   b| 10|  age|   >|   60|Senior Citizen|
|   c| 30|  age|   >|   60|Senior Citizen|
+----+---+-----+----+-----+--------------+

Теперь мне нужно применить фильтр к нему, чтобы получить те строки, которые удовлетворяют условию фильтра, как указано ниже.

  • Применить фильтр к полю в столбце field
  • Операция для выполнения находится в столбце optr, а
  • Значение для сравнения - в столбце value.

Пример: Для первой строки - применить фильтр к столбцу age (здесь все значения полей - это возраст, но он может отличаться), где возраст - меньше(<) значение 18 </em>, равное false, как age = 75 .

Я не знаю, как указать это условие фильтра в scala.Полученный набор данных должен выглядеть как

+----+---+-----+----+-----+--------------+
|name|age|field|optr|value|          rule|
+----+---+-----+----+-----+--------------+
|   b| 10|  age|   <|   18|         Minor|
|   a| 75|  age|  >=|   18|         Major|
|   c| 30|  age|  >=|   18|         Major|
|   a| 75|  age|   >|   60|Senior Citizen|
+----+---+-----+----+-----+--------------+

Ответы [ 2 ]

0 голосов
/ 21 февраля 2019

Проверьте это:

scala> val df = Seq(("a",75,"age","<",18,"Minor"),("b",10,"age","<",18,"Minor"),("c",30,"age","<",18,"Minor"),("a",75,"age",">=",18,"Major"),("b",10,"age",">=",18,"Major"),("c",30,"age",">=",18,"Major"),("a",75,"age",">",60,"Senior Citizen"),("b",10,"age",">",60,"Senior Citizen"),("c",30,"age",">",60,"Senior Citizen")).toDF("name","age","field","optr","value","rule")
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 4 more fields]

scala> df.show(false)
+----+---+-----+----+-----+--------------+
|name|age|field|optr|value|rule          |
+----+---+-----+----+-----+--------------+
|a   |75 |age  |<   |18   |Minor         |
|b   |10 |age  |<   |18   |Minor         |
|c   |30 |age  |<   |18   |Minor         |
|a   |75 |age  |>=  |18   |Major         |
|b   |10 |age  |>=  |18   |Major         |
|c   |30 |age  |>=  |18   |Major         |
|a   |75 |age  |>   |60   |Senior Citizen|
|b   |10 |age  |>   |60   |Senior Citizen|
|c   |30 |age  |>   |60   |Senior Citizen|
+----+---+-----+----+-----+--------------+

scala> val df2 = df.withColumn("condn", concat('field,'optr,'value))
df2: org.apache.spark.sql.DataFrame = [name: string, age: int ... 5 more fields]

scala> val condn_list=df2.groupBy().agg(collect_set('condn).as("condns")).as[(Seq[String])].first
condn_list: Seq[String] = List(age>60, age<18, age>=18)

scala>  val df_filters = condn_list.map{ x => df2.filter(s""" condn='${x}' and $x """) }
df_filters: Seq[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = List([name: string, age: int ... 5 more fields], [name: string, age: int ... 5 more fields], [name: string, age: int ... 5 more fields])

scala> df_filters(0).union(df_filters(1)).union(df_filters(2)).show(false)
+----+---+-----+----+-----+--------------+-------+
|name|age|field|optr|value|rule          |condn  |
+----+---+-----+----+-----+--------------+-------+
|b   |10 |age  |<   |18   |Minor         |age<18 |
|a   |75 |age  |>   |60   |Senior Citizen|age>60 |
|a   |75 |age  |>=  |18   |Major         |age>=18|
|c   |30 |age  |>=  |18   |Major         |age>=18|
+----+---+-----+----+-----+--------------+-------+


scala>

Чтобы получить союзы, вы можете сделать что-то вроде

scala> var res = df_filters(0)
res: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, age: int ... 5 more fields]

scala> (1 until df_filters.length).map( x => { res = res.union(df_filters(x)) } )
res20: scala.collection.immutable.IndexedSeq[Unit] = Vector((), ())

scala> res.show(false)
+----+---+-----+----+-----+--------------+-------+
|name|age|field|optr|value|rule          |condn  |
+----+---+-----+----+-----+--------------+-------+
|b   |10 |age  |<   |18   |Minor         |age<18 |
|a   |75 |age  |>   |60   |Senior Citizen|age>60 |
|a   |75 |age  |>=  |18   |Major         |age>=18|
|c   |30 |age  |>=  |18   |Major         |age>=18|
+----+---+-----+----+-----+--------------+-------+


scala>
0 голосов
/ 21 февраля 2019

Решение как ниже -

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.Row
import scala.collection.mutable

val encoder = RowEncoder(df.schema);
df.flatMap(row => {
    val result = new mutable.MutableList[Row];
    val ruleField = row.getAs[String]("field");
    val ruleValue = row.getAs[Int]("value");
    val ruleOptr = row.getAs[String]("optr");
    val rowField = row.getAs[Int](ruleField);
    val condition = {ruleOptr match{
        case "=" => rowField == ruleValue;
        case "<" => rowField < ruleValue;
        case "<=" => rowField <= ruleValue;
        case ">" => rowField > ruleValue;
        case ">=" => rowField >= ruleValue;
        case _ => false;
        }
    };
    if (condition){
        result+=row;
    };
    result;
})(encoder).show();
...