Объединить карты в scala данных - PullRequest
1 голос
/ 24 апреля 2020

У меня есть фрейм данных со столбцами col1, col2, col3. col1, col2 - строки. col3 - это карта [String, String], определенная ниже

 |-- col3: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Я сгруппировалась по col1, col2 и агрегирована с использованием collect_list для получения массива карт и сохранена в col4.

 df.groupBy($"col1", $"col2").agg(collect_list($"col3").as("col4"))

 |-- col4: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

Однако я хотел бы получить col4 как одну карту со всеми картами вместе взятыми. В настоящее время у меня есть:

[[a->a1,b->b1],[c->c1]]

Ожидаемый результат

[a->a1,b->b1,c->c1]

Было бы идеально использовать udf?

Любая помощь приветствуется. Спасибо.

Ответы [ 2 ]

1 голос
/ 24 апреля 2020

Вы можете использовать агрегат и map_concat :

import org.apache.spark.sql.functions.{expr, collect_list}

val df = Seq(
  (1, Map("k1" -> "v1", "k2" -> "v3")),
  (1, Map("k3" -> "v3")),
  (2, Map("k4" -> "v4")),
  (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")

val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr.as("merged_data"))
  .show(false)

// +---+------------------------------+
// |id |merged_data                   |
// +---+------------------------------+
// |1  |[k1 -> v1, k2 -> v3, k3 -> v3]|
// |2  |[k4 -> v4, k6 -> v6, k5 -> v5]|
// +---+------------------------------+

С map_concat мы объединяем все Map элементы данных столбец с помощью встроенной функции aggregate, которая позволяет применять агрегацию к парам списка.

Внимание : текущая реализация map_concat на Spark 2.4.5 это позволяет сосуществовать идентичные ключи. Скорее всего, это ошибка, так как она не соответствует ожидаемому поведению согласно официальной документации . Помните об этом.

Если вы хотите избежать такого случая, вы также можете go для UDF:

import org.apache.spark.sql.functions.{collect_list, udf}

val mergeMapUDF = udf((data: Seq[Map[String, String]]) => data.reduce(_ ++ _))

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeMapUDF($"data").as("merged_data"))
  .show(false)
0 голосов
/ 24 апреля 2020

Вы можете достичь этого без UDF. Давайте создадим ваш фрейм данных:

val df = Seq(Seq(Map("a" -> "a1", "b" -> "b1"), Map("c" -> "c1", "d" -> "d1"))).toDF()
df.show(false)
df.printSchema()

output:

+----------------------------------------+
|value                                   |
+----------------------------------------+
|[[a -> a1, b -> b1], [c -> c1, d -> d1]]|
+----------------------------------------+

root
 |-- value: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

Если ваш массив содержит 2 элемента, просто используйте map_concat:

df.select(map_concat('value.getItem(0), 'value.getItem(1))).show(false)

или этот (I понятия не имею, как динамически l oop из 0 в 'значение размера столбца типа массива, это может быть самое короткое решение)

df.select(map_concat((for {i <- 0 to 1} yield 'value.getItem(i)): _*)).show(false)

В противном случае, если ваш массив содержит несколько карт и размеров неизвестно, вы можете попробовать следующий метод:

  val df2 = df.map(s => {
    val list = s.getList[Map[String, String]](0)
    var map = Map[String, String]()
    for (i <- 0 to list.size() - 1) {
      map = map ++ list.get(i)
    }
    map
  })

  df2.show(false)
  df2.printSchema()

вывод:

+------------------------------------+
|value                               |
+------------------------------------+
|[a -> a1, b -> b1, c -> c1, d -> d1]|
+------------------------------------+

root
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
...