Ключ, соответствующий максимальному значению в столбце карты искры - PullRequest
0 голосов
/ 08 января 2020

Если у меня есть столбец карты искры от строки к двойному, легко ли сгенерировать новый столбец с ключом, соответствующим максимальному значению?

Я смог добиться этого, используя функции сбора, как показано ниже:

import org.apache.spark.sql.functions._

val mockedDf = Seq(1, 2, 3)
  .toDF("id")
  .withColumn("optimized_probabilities_map", typedLit(Map("foo"->0.34333337, "bar"->0.23)))
val df = mockedDf
  .withColumn("optimizer_probabilities", map_values($"optimized_probabilities_map"))
  .withColumn("max_probability", array_max($"optimizer_probabilities"))
  .withColumn("max_position", array_position($"optimizer_probabilities", $"max_probability"))
  .withColumn("optimizer_ruler_names", map_keys($"optimized_probabilities_map"))
  .withColumn("optimizer_ruler_name", $"optimizer_ruler_names"( $"max_position"))

Однако это решение неоправданно долго и не очень эффективно. Существует также возможная проблема точности, так как я сравниваю удвоения при использовании array_position. Интересно, есть ли лучший способ сделать это без UDF, возможно, используя строку выражения.

Ответы [ 2 ]

1 голос
/ 08 января 2020

Так как вы можете использовать Spark 2.4+, одним из способов является использование встроенной функции Spark- SQL агрегат , где мы перебираем все map_keys, а затем сравниваем соответствующие значения map_values ​​с буферизованными значениями acc.val и затем обновите acc.name соответственно:

mockedDf.withColumn("optimizer_ruler_name", expr("""
    aggregate(
      map_keys(optimized_probabilities_map), 
      (string(NULL) as name, double(NULL) as val),
      (acc, y) ->
        IF(acc.val is NULL OR acc.val < optimized_probabilities_map[y]
        , (y as name, optimized_probabilities_map[y] as val)
        , acc
        ),
      acc -> acc.name
    )
""")).show(false)
+---+--------------------------------+--------------------+
|id |optimized_probabilities_map     |optimizer_ruler_name|
+---+--------------------------------+--------------------+
|1  |[foo -> 0.34333337, bar -> 0.23]|foo                 |
|2  |[foo -> 0.34333337, bar -> 0.23]|foo                 |
|3  |[foo -> 0.34333337, bar -> 0.23]|foo                 |
+---+--------------------------------+--------------------+
1 голос
/ 08 января 2020

Другим решением было бы взорвать столбец карты и затем использовать функцию Window, чтобы получить максимальное значение, подобное этому:

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"id")

val df = mockedDf.select($"id", $"optimized_probabilities_map", explode($"optimized_probabilities_map"))
                 .withColumn("max_value", max($"value").over(w))
                 .where($"max_value" === $"value")
                 .drop("value", "max_value")
...