Scala: фильтр с различными условиями, указанными кортежами в наборе - PullRequest
0 голосов
/ 09 февраля 2020

У меня есть СДР с полем 1, содержащим название лекарства, и полем 2, содержащим соответствующую дозировку этого лекарства. Я пытаюсь отфильтровать этот СДР на основе нескольких критериев, сохраненных в наборе кортежей, например:

val MyCriteria = Set(("drug a", ">", 1.2), ("drug b", ">=", 4.5), ("drug c", "<", 6.3))

Я думаю, что я могу сделать что-то вроде:

val rslt = rdd.filter(x => MyCriteria.foreach(x.field1 == _._1 && x.field2 _._2 _._3))

Но я не знаю, как преобразовать 2-й элемент кортежа (строки) в реальные операторы, которые scala понимает. Выдает сообщение об ошибке:

<console>:1: error: ')' expected but '.' found.
    val rslt = rdd.filter(x => MyCriteria.foreach(x.field1 == _._1 && x.field2 _._2 _._3))
                                                                                ^

Или что может быть лучше для реализации фильтра?

Ответы [ 2 ]

1 голос
/ 09 февраля 2020

Это не будет работать так, Scala строковый литерал не будет переведен в оператор. Вместо этого вам нужно использовать функцию для сравнения значений из СДР с результатами фильтра.

Пожалуйста, см. Пример кода ниже:

type Compare[T : Numeric] = (T, T) => Boolean
type DoubleCompare = Compare[Double]
val > : DoubleCompare = _ > _
val < : DoubleCompare = _ < _
val >= : DoubleCompare = _ >= _

val myCriteria: Set[(String, DoubleCompare, Double)] = Set (
    ("drug a", > , 1.2),
    ("drug b", >=, 4.5),
    ("drug c", <,  6.3)
)

rdd.filter { x =>
    val fieldName = x.field1
    val fieldValue = x.field2
    myCriteria.foreach {
        case (filterFieldName, filter, filterValue) =>
            (fieldName == filterFieldName) && filter(fieldValue, filterValue)
    }
}

Надеюсь, это поможет!

0 голосов
/ 09 февраля 2020

Если вы можете использовать DataFrame вместо RDD, тогда вы можете использовать выражение where со строкой, которое будет обрабатывать все типы данных, и вам не нужно кодировать их.

    import spark.implicits._
    val df = spark.createDataset(Seq(("drug a",5),("drug b",4),("drug c",4))).toDF("drug","dose")
    val criteria = Set(("drug a", ">", 1.2), ("drug b", ">=", 4.5), ("drug c", "<", 6.3))

    df.show()

    val criteriaExp = criteria.foldLeft(""){(cstr, cset)=>
      if(cstr == "")
        s"(drug = '${cset._1}' and dose ${cset._2} ${cset._3})"
      else
        s"$cstr or (drug = '${cset._1}' and dose ${cset._2} ${cset._3})"
    }

    println(criteriaExp)
    df.where(criteriaExp).show()

Результат

+------+----+
|  drug|dose|
+------+----+
|drug a|   5|
|drug b|   4|
|drug c|   4|
+------+----+

criteria string - (drug = 'drug a' and dose > 1.2) or (drug = 'drug b' and dose >= 4.5) or (drug = 'drug c' and dose < 6.3)

+------+----+
|  drug|dose|
+------+----+
|drug a|   5|
|drug c|   4|
+------+----+
...