Это может быть достигнуто динамически, если подходить к созданию условий when
несколько абстрактно.
Чтобы сделать его более читабельным, давайте создадим выделенный объект-обертку:
case class WhenCondition(valuesToCheck: Seq[(String, Any)], valueIfMatches: Any)
Поле
valuesToCheck
содержит последовательность кортежей для всех применимых условий для генерируемого выражения. Первый элемент каждого кортежа - это имя столбца, второй - значение, с которым нужно сопоставить. Например: ("CoreSectorLevel1Code", "Derivatives")
.
valueIfMatches
соответствует второму аргументу, передаваемому when
: "1", "3" или "4" из примера.
Нам понадобится функция, которая считывает условия из исходной таблицы и возвращает последовательность WhenCondition
экземпляров:
private def readConditions(args: Any*): Seq[WhenCondition] = {
??? // impl depends on how you read the source data
}
// for the example of the question, this function should return:
val conditions = Seq(
WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("CoreSectorLevel2Code", "Caps"), ("FAS157Flavor", "SPRD")), "1"),
WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("FAS157Flavor", "TRSY"), ("DerivativeType", "TROR")), "3"),
WhenCondition(Seq(("InDefaultInd", "Y")), "4")
)
Обновление
Нам также понадобится функция, которая производит цепочку из when
вызовов:
private def chainWhen(chained: Column, remaining: Seq[(Column, Any)]): Column =
remaining match {
case Nil => chained
case head :: tail => chainWhen(chained.when(head._1, head._2), tail)
}
Теперь мы создаем функцию dynamicWhen
, которая преобразует последовательность WhenCondition
:
private def dynamicWhen(parsedConditions: Seq[WhenCondition]): Option[Column] = {
// first, we transform a WhenCondition object into a tuple of args (Column, Any) for the target "when" function
val conditions = parsedConditions.map(whenCondition => {
val condition = whenCondition.valuesToCheck
.map(cond => col(cond._1) === cond._2)
.reduce(_ && _)
(condition, whenCondition.valueIfMatches)
})
// if there weren't any input conditions, we return None, otherwise we chain the transformation and wrap it into Some
conditions match {
case Nil => None
case head :: tail => Some(chainWhen(when(head._1, head._2), tail))
}
}
И оригинальный жестко закодированный вызов можно заменить на
// produce the optional chained dynamic "whens".
val whenConditions = dynamicWhen(conditions)
// map it in a DataFrame with a new column or keep the original one, if there were no "whens".
val result = whenConditions.map(cond => df.withColumn("AMGClassRule", cond))
.getOrElse(df)
Наконец, короткий тест с поддельными данными:
val df = Seq(
("Derivatives", "Caps", "SPRD", "x", "N"),
("Derivatives", "Caps", "SPRD", "TROR", "Y"),
("Derivatives", "Caps", "TRSY", "x", "Y"),
("Derivatives", "Caps", "TRSY", "TROR", "N"),
("Derivatives", "Caps", "zzzz", "x", "N")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "DerivativeType", "InDefaultInd")
val result = ... // transformations above
result.show(false)
+--------------------+--------------------+------------+--------------+------------+------------+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|DerivativeType|InDefaultInd|AMGClassRule|
+--------------------+--------------------+------------+--------------+------------+------------+
|Derivatives |Caps |SPRD |x |N |1 |
|Derivatives |Caps |SPRD |TROR |Y |1 |
|Derivatives |Caps |TRSY |x |Y |4 |
|Derivatives |Caps |TRSY |TROR |N |3 |
|Derivatives |Caps |zzzz |x |N |null |
+--------------------+--------------------+------------+--------------+------------+------------+
Заключительная обновление
Обновление 2
Пример чтения условий из DataFrame.
Предположим, что в DataFrame хранятся следующие описания условий:
val rulesDf = Seq(
("Derivatives", "%", "%", "16"),
("Derivatives", "Fx Options", "%", "17"),
("Derivatives", "Futures", "%", "48")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "rule")
rulesDf.show(false)
+--------------------+--------------------+------------+----+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|rule|
+--------------------+--------------------+------------+----+
|Derivatives |% |% |16 |
|Derivatives |Fx Options |% |17 |
|Derivatives |Futures |% |48 |
+--------------------+--------------------+------------+----+
Мы можем читать и преобразовывать их в WhenCondition
упаковщики, используя следующее:
private def readConditions(): Seq[WhenCondition] = {
val ruleColumnName = "rule"
val ruleColumnIndex = rulesDf.schema.fieldIndex(ruleColumnName)
val conditionColumns = rulesDf.schema.fieldNames.filter(_ != ruleColumnName).toSeq
rulesDf.rdd.map(row => {
val valuesToCheck = conditionColumns.map(colName => (colName, row.get(row.fieldIndex(colName))))
val rule = row(ruleColumnIndex)
WhenCondition(valuesToCheck, rule)
}).collect().toSeq
}
readConditions().foreach(println)
// outputs:
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,%), (FAS157Flavor,%)),16)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Fx Options), (FAS157Flavor,%)),17)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Futures), (FAS157Flavor,%)),48)
Конец обновления 2