Как преобразовать значения карты, используя встроенные функции Spark? - PullRequest
0 голосов
/ 11 марта 2020

У меня есть простой набор данных со схемой:

root
 |-- columns: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = false)
 |    |    |-- a: integer (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- c: float (nullable = true)
 |    |    |-- d: double (nullable = true)
           |-- ... 
           |-- ...

Пример:

+---------------------------------------------------+
|columns                                            |
+---------------------------------------------------+
|[k0 -> [,,,, 2,], k1 -> [,,,, AB,], k2 -> [,,M,,,] |
+---------------------------------------------------+

Я хочу преобразовать свой набор данных в новый набор данных со схемой:

root
 |-- columns: map (nullable = true)
 |    |-- key: string
 |    |-- value: string

Правила преобразования:

  1. Размер структуры не определен.
  2. Получить 1-й непустой элемент из структуры значений (в виде строки).

Пример вывода:

+----------------------------+
|columns                     |
+----------------------------+
|[k0 -> 2, k1 -> AB, k2 -> M |
+----------------------------+

Вот мое решение UDF

val my_udf: UserDefinedFunction = udf((m: Map[String, Row]) => m.map { case (k, v) => (k, v.toSeq.find(_ != null).map(_.toString)) })

df.select(my_udf(col("columns")))

Можно ли переписать его с помощью встроенных функций Spark?

Примерно так:

df.withColumn("data", expr("transform(fields.items(), (k, v) -> (k, get-1st-not-null-element-from-v)"))

Вот еще одна попытка (Spark 3.0 +):

df.select(map_entries(col("fields")).as("array"))
.select(
  expr(
    "transform(array, (e, _) -> " +
      "struct(cast(e.key as string), coalesce(e.value.a, e.value.b, e.value.c, e.value.d, ...)))"
    ).as("entries")
  )
.select(map_from_entries(col("entries")))

1 Ответ

1 голос
/ 12 марта 2020

IIU C, вы можете попробовать преобразовать + агрегат (предполагается, что имя столбца col1):

df.selectExpr("""
  aggregate(
    transform(map_keys(col1), x -> map(x, coalesce(col1[x].a,col1[x].b,col1[x].c,col1[x].d))), 
    /* zero_value: use an empty map() */
    map(), 
    /* merge: do map_concat() */
    (acc,y) -> map_concat(acc, y)
  )  as col1
""").show()

Где :

  • Используйте функцию transform () для перебора массива из map_keys, преобразования каждого элемента x в карту с x в качестве ключа и установите значение первого ненулевого значения из поля StructType, используя coalesce(col1[x].a,col1[x].b,col1[x].c,col1[x].d). Это приведет к массиву карт.

  • используйте функцию aggregate () для объединения вышеуказанного массива карт в столбец MapType.

Для spark 3.0+ используйте transform_values ​​:

df.selectExpr("transform_values(col1, (k,v) -> coalesce(v.a, v.b, v.c, v.d)) as col1").show()
...