Как отфильтровать кадр данных в Spark scala с реляционными операторами в качестве переменных? - PullRequest
0 голосов
/ 11 февраля 2020

У меня есть датафрейм, как показано ниже:

myDF:

+-----+
|value|
+-----+
|8    |
|8    |
|1    |
+-----+

Программа читает из другого вычисленного фрейма данных и получает следующие два значения:

val attr = 5
val opr = >

Теперь мне нужно отфильтровать myDF на основе значений. Поэтому мой результат будет таким, как показано ниже:

resultDF:
+-----+----------+
|value|result    |
+-----+----------+
|8    |GOOD      |
|8    |GOOD      |
|1    |BAD       |
+-----+----------+

Код, который я использовал:

val resultDF = myDF.withColumn("result", when(col("value") > attr, "GOOD").otherwise("BAD"))

Теперь attr и opr будут динамически меняться. Значение оператора может быть любым из >, <, >=, <=, <>.

В зависимости от оператора, который я получаю, состояние фильтра должно измениться. Как мне нужно использовать переменную для оператора.

Может кто-нибудь, пожалуйста, посоветовать?

val resultDF = myDF.withColumn("result", when(col("value") opr attr, "GOOD").otherwise("BAD"))

1 Ответ

2 голосов
/ 12 февраля 2020

Во-первых, как сказал @ Эндрю , плохая идея использовать Dynami c sql без особой причины из-за неопределенного поведения и трудностей в отладке. Предположим, что вы объединили значения с оператором dataframe, тогда вы можете использовать этот код:

import spark.implicits._

val appData: DataFrame = Seq(
  ("1", ">"),
  ("1", ">"),
  ("3", "<="),
  ("4", "<>"),
  ("6", ">="),
  ("6", "==")
).toDF("value", "operator")

val attr = 5

def compare(value: String, operator: String, sample: Int): String = {
  val isValueCorrectForAttr: Boolean = operator match {
    case ">" => value.toInt > sample
    case "<" => value.toInt < sample
    case ">=" => value.toInt >= sample
    case "<=" => value.toInt <= sample
    case "==" => value.toInt == sample
    case "<>" => value.toInt != sample
    case _ => throw new IllegalArgumentException(s"Wrong operator: $operator")
  }
  if (isValueCorrectForAttr) "GOOD" else "BAD"
}

import org.apache.spark.sql.functions._
val dynamic_compare =  spark.udf.register("dynamic_compare", (v: String, op: String) => compare(v, op, attr))
appData.withColumn("result", dynamic_compare(col("value"), col("operator")))

, если у вас нет столбца оператора и только один оператор, это может быть проще:

import spark.implicits._

val appData: DataFrame = Seq(
  "1",
  "1",
  "3",
  "4",
  "6",
  "6"
).toDF("value")

val attr = 5
val op = ">"

def compare(value: String, operator: String, sample: Int): String = {
  val isValueCorrectForAttr: Boolean = operator match {
    case ">" => value.toInt > sample
    case "<" => value.toInt < sample
    case ">=" => value.toInt >= sample
    case "<=" => value.toInt <= sample
    case "==" => value.toInt == sample
    case "<>" => value.toInt != sample
    case _ => throw new IllegalArgumentException(s"Wrong operator: $operator")
  }
  if (isValueCorrectForAttr) "GOOD" else "BAD"
}

import org.apache.spark.sql.functions._
val dynamic_compare =  spark.udf.register("dynamic_compare", (value: String) => compare(value, op, attr))
appData.withColumn("result", dynamic_compare(col("value")))
...