Объединить столбцы в одну карту с UDF из массива имен столбцов - PullRequest
1 голос
/ 28 июня 2019

Итак, у меня есть данные со значениями, которые нужно сложить вместе, а затем поместить в формат Map[String,Long] для сохранения в Cassandra.

Приведенный ниже код работает, однако мне было интересно, можно ли отобразить картусоздан на основе абстрактного списка столбцов.(Глядя на исходный код для их функций, я только растерялся).

var cols = Array("key", "v1", "v2")
var df = Seq(("a",1,0),("b",1,0),("a",1,1),("b",0,0)).toDF(cols: _*)
val df1 = df.groupBy(col(cols(0))).
  agg(map(lit(cols(1)), sum(col(cols(1))), lit(cols(2)), sum(col(cols(2)))) as "map")

Это мой желаемый формат для кадра данных и текущего результата с указанным выше кодом:

scala> df1.show(false)
+---+---------------------+
|key|map                  |
+---+---------------------+
|b  |Map(v1 -> 1, v2 -> 0)|
|a  |Map(v1 -> 2, v2 -> 1)|
+---+---------------------+

Я хотел бы видеть функцию, которая может возвращать то же, что и выше, но иметь возможность размещать столбцы программно на основе имени.Например:

var columnNames = Array("v1", "v2")
df.groupBy(col(cols(0))).agg(create_sum_map(columnNames) as "map")

Возможно ли это даже в Spark удаленно?

Ответы [ 2 ]

1 голос
/ 01 июля 2019

Итак, я понял, как получить результат моего разыскиваемого ответа на основе ответа @ Shaido.

def create_sum_map(cols: Array[String]): Column = 
  map(cols.flatMap(c => Seq(lit(c), sum(col(c)))):_*)

df.groupBy(col(cols.head)).agg(create_sum_map(columnNames) as "map")

Я предполагаю, что это работает, потому что sum(Column) с соответствующими столбцами присутствует в create_sum_map() в функции .agg().

1 голос
/ 28 июня 2019

Нет необходимости использовать медленный UDF, этого можно достичь с помощью чисто встроенных функций Spark и переменных, см., Например, Spark SQL: применить агрегатные функции к списку столбцов .Это решение требует построения списка столбцов, к которым можно применить агрегацию.Здесь все немного сложнее, поскольку вы хотите map в конечном выводе, для этого требуется дополнительный шаг.

Сначала создайте выражения (столбцы) для использования в агрегации:

val exprs = cols.tail.flatMap(c => Seq(lit(c), sum(col(c))))

Примените группу к и используйте созданный exprs:

val df2 = df.groupBy(col(cols.head)).agg(exprs.head, exprs.tail:_*)
  .select(col(cols.head), map(cols.tail.flatMap(c => Seq(col(c), col(s"sum($c)"))):_*).as("map"))

Над дополнительным select требуется для создания map, а cols.tail.flatMap(c => Seq(col(c), col(s"sum($c)")) - это просто списокновые столбцы, которые должны быть добавлены в map.

Полученный результат идентичен предыдущему:

+---+---------------------+
|key|map                  |
+---+---------------------+
|b  |Map(v1 -> 1, v2 -> 0)|
|a  |Map(v1 -> 2, v2 -> 1)|
+---+---------------------+
...