фрейм данных с динамическим условием использования с колонкой - PullRequest
0 голосов
/ 10 мая 2018

У меня есть код Scala для вычисления нового столбца с использованием функции withcolumn в моем жестко запрограммированном коде, который выглядит примерно так

 combinedinputdf
  .withColumn("AMGClassRule", when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "1")
      //.when(col("CoreSectorLevel1Code") === "Derivatives" && col("CoreSectorLevel2Code") === "Caps" && col("FAS157Flavor") === "SPRD", "2")
      .when(col("CoreSectorLevel1Code") === "Derivatives" && col("FAS157Flavor") === "TRSY" && col("DerivativeType") === "TROR", "3")
      .when(col("InDefaultInd") === "Y" , "4")

Это работает как ожидалось.

Но я хочу динамически добавлять или изменять условие when при выполнении на основе таблицы или CSV

вторая таблица выглядит следующим образом Tables with rules

так что во время выполнения я могу прочитать эту таблицу либо в dataframe, либо в Map, перебрать правила таблицы и присвоить значение своему выводу

Как мне сделать это динамически?

1 Ответ

0 голосов
/ 11 мая 2018

Это может быть достигнуто динамически, если подходить к созданию условий when несколько абстрактно.

Чтобы сделать его более читабельным, давайте создадим выделенный объект-обертку:

case class WhenCondition(valuesToCheck: Seq[(String, Any)], valueIfMatches: Any)
Поле

valuesToCheck содержит последовательность кортежей для всех применимых условий для генерируемого выражения. Первый элемент каждого кортежа - это имя столбца, второй - значение, с которым нужно сопоставить. Например: ("CoreSectorLevel1Code", "Derivatives").

valueIfMatches соответствует второму аргументу, передаваемому when: "1", "3" или "4" из примера.

Нам понадобится функция, которая считывает условия из исходной таблицы и возвращает последовательность WhenCondition экземпляров:

private def readConditions(args: Any*): Seq[WhenCondition] = {
  ??? // impl depends on how you read the source data
}

// for the example of the question, this function should return:
val conditions = Seq(
  WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("CoreSectorLevel2Code", "Caps"), ("FAS157Flavor", "SPRD")), "1"),
  WhenCondition(Seq(("CoreSectorLevel1Code", "Derivatives"), ("FAS157Flavor", "TRSY"), ("DerivativeType", "TROR")), "3"),
  WhenCondition(Seq(("InDefaultInd", "Y")), "4")
)

Обновление

Нам также понадобится функция, которая производит цепочку из when вызовов:

private def chainWhen(chained: Column, remaining: Seq[(Column, Any)]): Column =
  remaining match {
    case Nil => chained
    case head :: tail => chainWhen(chained.when(head._1, head._2), tail)
  }

Теперь мы создаем функцию dynamicWhen, которая преобразует последовательность WhenCondition:

private def dynamicWhen(parsedConditions: Seq[WhenCondition]): Option[Column] = {
  // first, we transform a WhenCondition object into a tuple of args (Column, Any) for the target "when" function
  val conditions = parsedConditions.map(whenCondition => {
    val condition = whenCondition.valuesToCheck
                                 .map(cond => col(cond._1) === cond._2)
                                 .reduce(_ && _)
    (condition, whenCondition.valueIfMatches)
  })
  // if there weren't any input conditions, we return None, otherwise we chain the transformation and wrap it into Some
  conditions match {
    case Nil => None
    case head :: tail => Some(chainWhen(when(head._1, head._2), tail))
  }
}

И оригинальный жестко закодированный вызов можно заменить на

// produce the optional chained dynamic "whens".
val whenConditions = dynamicWhen(conditions)

// map it in a DataFrame with a new column or keep the original one, if there were no "whens".
val result = whenConditions.map(cond => df.withColumn("AMGClassRule", cond))
                           .getOrElse(df)

Наконец, короткий тест с поддельными данными:

val df = Seq(
  ("Derivatives", "Caps", "SPRD", "x", "N"),
  ("Derivatives", "Caps", "SPRD", "TROR", "Y"),
  ("Derivatives", "Caps", "TRSY", "x", "Y"),
  ("Derivatives", "Caps", "TRSY", "TROR", "N"),
  ("Derivatives", "Caps", "zzzz", "x", "N")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "DerivativeType", "InDefaultInd")

val result = ... // transformations above
result.show(false)

+--------------------+--------------------+------------+--------------+------------+------------+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|DerivativeType|InDefaultInd|AMGClassRule|
+--------------------+--------------------+------------+--------------+------------+------------+
|Derivatives         |Caps                |SPRD        |x             |N           |1           |
|Derivatives         |Caps                |SPRD        |TROR          |Y           |1           |
|Derivatives         |Caps                |TRSY        |x             |Y           |4           |
|Derivatives         |Caps                |TRSY        |TROR          |N           |3           |
|Derivatives         |Caps                |zzzz        |x             |N           |null        |
+--------------------+--------------------+------------+--------------+------------+------------+

Заключительная обновление

Обновление 2

Пример чтения условий из DataFrame.

Предположим, что в DataFrame хранятся следующие описания условий:

val rulesDf = Seq(
  ("Derivatives", "%", "%", "16"),
  ("Derivatives", "Fx Options", "%", "17"),
  ("Derivatives", "Futures", "%", "48")
).toDF("CoreSectorLevel1Code", "CoreSectorLevel2Code", "FAS157Flavor", "rule")

rulesDf.show(false)

+--------------------+--------------------+------------+----+
|CoreSectorLevel1Code|CoreSectorLevel2Code|FAS157Flavor|rule|
+--------------------+--------------------+------------+----+
|Derivatives         |%                   |%           |16  |
|Derivatives         |Fx Options          |%           |17  |
|Derivatives         |Futures             |%           |48  |
+--------------------+--------------------+------------+----+

Мы можем читать и преобразовывать их в WhenCondition упаковщики, используя следующее:

private def readConditions(): Seq[WhenCondition] = {
  val ruleColumnName = "rule"
  val ruleColumnIndex = rulesDf.schema.fieldIndex(ruleColumnName)
  val conditionColumns = rulesDf.schema.fieldNames.filter(_ != ruleColumnName).toSeq
  rulesDf.rdd.map(row => {
    val valuesToCheck = conditionColumns.map(colName => (colName, row.get(row.fieldIndex(colName))))
    val rule = row(ruleColumnIndex)
    WhenCondition(valuesToCheck, rule)
  }).collect().toSeq
}

readConditions().foreach(println)

// outputs:
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,%), (FAS157Flavor,%)),16)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Fx Options), (FAS157Flavor,%)),17)
WhenCondition(mutable.ArrayBuffer((CoreSectorLevel1Code,Derivatives), (CoreSectorLevel2Code,Futures), (FAS157Flavor,%)),48)

Конец обновления 2

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...