Spark: агрегация с динамическим фильтром на фрейме данных в Scala - PullRequest
0 голосов
/ 30 мая 2019

У меня есть такой фрейм данных, как

scala> testDf.show()
+------+--------+---------+------------+----------------------------------------+
|    id|    item|    value|  value_name|                               condition|
+------+--------+---------+------------+----------------------------------------+
|    11|    3210|        0|         OFF|                                value==0|
|    12|    3210|        1|         OFF|                                value==0|
|    13|    3210|        0|         OFF|                                value==0|
|    14|    3210|        0|         OFF|                                value==0|
|    15|    3210|        1|         OFF|                                value==0|
|    16|    5440|        5|          ON|                     value>0 && value<10|
|    17|    5440|        0|          ON|                     value>0 && value<10|
|    18|    5440|        6|          ON|                     value>0 && value<10|
|    19|    5440|        7|          ON|                     value>0 && value<10|
|    20|    5440|        0|          ON|                     value>0 && value<10|
|    21|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    22|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    23|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    24|    7780|        C|        TYPE|   Set("A","B").contains(value.toString)|
|    25|    7780|        C|        TYPE|   Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+

scala> testDf.printSchema
root
 |-- id: string (nullable = true)
 |-- item: string (nullable = true)
 |-- value: string (nullable = true)
 |-- value_name: string (nullable = true)
 |-- condition: string (nullable = true)

Я хочу удалить некоторые строки со столбцом 'condition'.Но у меня проблемы.

Я пробовал с тестовым кодом ниже.Но, похоже, он не работает должным образом.

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.Row
import scala.collection.mutable

val encoder = RowEncoder(testDf.schema);

testDf.flatMap(row => {
  val result = new mutable.MutableList[Row];
  val setting_value = row.getAs[String]("setting_value").toInt
  val condition = row.getAs[String]("condition").toBoolean
  if (condition){
      result+=row;
  };
  result;
})(encoder).show();

И это ошибка.

19/05/30 02:04:31 ERROR TaskSetManager: Task 0 in stage 267.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 267.0 failed 4 times, most recent failure: Lost task 0.3 in stage 267.0 (TID 3763, .compute.internal, executor 1): java.lang.IllegalArgumentException: For input string: "setting_value==0"
        at scala.collection.immutable.StringLike$class.parseBoolean(StringLike.scala:291)
        at scala.collection.immutable.StringLike$class.toBoolean(StringLike.scala:261)
        at scala.collection.immutable.StringOps.toBoolean(StringOps.scala:29)
        at $anonfun$1.apply(<console>:40)
        at $anonfun$1.apply(<console>:37)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Я хочу сохранить строки, соответствующие значению столбца условия.Это желаемый результат.

+------+--------+---------+------------+----------------------------------------+
|    id|    item|    value|  value_name|                               condition|
+------+--------+---------+------------+----------------------------------------+
|    11|    3210|        0|         OFF|                                value==0|
|    13|    3210|        0|         OFF|                                value==0|
|    14|    3210|        0|         OFF|                                value==0|
|    16|    5440|        5|          ON|                     value>0 && value<10|
|    18|    5440|        6|          ON|                     value>0 && value<10|
|    19|    5440|        7|          ON|                     value>0 && value<10|
|    21|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    22|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
|    23|    7780|        A|        TYPE|   Set("A","B").contains(value.toString)|
+------+--------+---------+------------+----------------------------------------+

Пожалуйста, помогите мне, если у вас есть хорошая идея.Спасибо.

Ответы [ 2 ]

1 голос
/ 30 мая 2019

Вот один из способов использования scala reflections API с функцией UDF. Udf обрабатывает оба случая для значений int и string:

import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox

val tb = currentMirror.mkToolBox()

val df = Seq(("0","value==0"),
("1", "value==0"),
("6", """value>0 && value<10"""),
("7", """value>0 && value<10"""),
("0", """value>0 && value<10"""),
("A", """Set("A","B").contains(value.toString)"""),
("C", """Set("A","B").contains(value.toString)""")).toDF("value", "condition")

def isAllDigits(x: String) = x.forall(Character.isDigit)

val evalExpressionUDF = udf((value: String, expr: String) => {
  val result =  isAllDigits(value) match {
    case true => tb.eval(tb.parse(expr.replace("value", s"""${value.toInt}""")))
    case false => tb.eval(tb.parse(expr.replace("value", s""""${value}"""")))
  }

  result.asInstanceOf[Boolean]
})

df.withColumn("eval", evalExpressionUDF($"value", $"condition"))
  .where($"eval" === true)
  .show(false)

Чехлы для evalExpressionUDF:

  • int: заменить выражение фактическим значением int, затем выполнить строковый код с mkToolBox
  • string: заключить строковое значение в "", затем заменить выражение строкой в ​​двойных кавычках и выполнить строковый код

Выход:

+-----+-------------------------------------+----+ 
|value|                           condition |eval| 
+-----+-------------------------------------+----+ 
|0    |value==0                             |true| 
|6    |value>0 && value<10                  |true| 
|7    |value>0 && value<10                  |true| 
|A    |Set("A","B").contains(value.toString)|true| 
+-----+-------------------------------------+----+

PS: я знаю, что производительность вышеупомянутого решения может быть плохой, поскольку он вызывает отражение, хотя я не знаю альтернативы.

1 голос
/ 30 мая 2019

В приведенном выше случае Spark пытается преобразовать значение String в Boolean.Это не оценка самого выражения.
И оценка выражения должна выполняться пользователем с использованием внешней библиотеки или пользовательского кода.
Ближайший (хотя и не точный сценарий), который я мог бы придумать, -
Как вычислить математическое выражение в виде строки? .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...