Как преобразовать столбец массивов в столбец с суммой каждого массива в Spark? - PullRequest
0 голосов
/ 25 июня 2018

Каждый подход, который я пробовал, оставляет мне сумму всего столбца.У каждой строки есть массив, заполненный двойниками.Мне нужен столбец сумм для каждой строки.

Итак, вы начинаете с кадра данных, который выглядит следующим образом:

id   c2   c3
-------------------------
1     1   [2.0, 1.0, 0.0]
2     2   [0.0, 0,0, 0.0]

И в результате я хочу это:

id   c2   c3sum
-------------------------
1     1   3.0
2     2   0.0

Я попытался использовать метод sum после выполнения groupBy для "id".Я также пытался использовать udf:

def mySum(arr:Seq[Double]):Double=arr.reduceLeft(_+_)
val dfsum = df.withColumn("c3sum", mySum($"c3"))

Эти и другие варианты udf всегда возвращали сумму всего в столбце.В качестве теста я также попытался использовать array.max, чтобы просто получить максимальное число для каждого массива вместо их суммирования, и он вернул максимальное значение для всего столбца.Поэтому, скорее всего, это какая-то основная проблема синтаксиса, которую я не понимаю.

Заранее благодарю за помощь.

Ответы [ 2 ]

0 голосов
/ 25 июня 2018

Возможно, вы захотите использовать Dataset map с sum вместо того, чтобы полагаться на UDF:

import org.apache.spark.sql.functions._

val df = Seq(
  (1, 1, Array(2.0, 1.0, 0.0)),
  (2, 2, Array(0.0, 0.0, 0.0))
).toDF("id", "c2", "c3")

df.
  withColumn("c3", coalesce($"c3", lit(Array[Double]()))).
  as[(Int, Int, Array[Double])].
  map{ case (id, c2, c3) => (id, c2, c3.sum) }.
  toDF("id", "c2", "c3sum").
  show

// +---+---+-----+
// | id| c2|c3sum|
// +---+---+-----+
// |  1|  1|  3.0|
// |  2|  2|  0.0|
// +---+---+-----+

Обратите внимание, что перед преобразованием в набор данных coalesce применяется к c3, чтобы заменить null (если есть) пустым массивом [Double].

0 голосов
/ 25 июня 2018

Одним из возможных решений является использование udf (как вы уже пробовали).Чтобы это работало, вам нужно импортировать и использовать org.apache.spark.sql.functions.udf для создания udf.Рабочий пример:

import org.apache.spark.sql.functions.udf

val df = Seq(
    (1, 1, Seq(2.0, 1.0, 0.0)), 
    (2, 2, Seq(0.0, 0.0, 0.0)), 
    (3, 3, Seq(0.0, 1.0, 0.0))
).toDF("id", "c2", "c3")

val mySum = udf((arr: Seq[Double]) => arr.sum)
val dfsum = df.withColumn("c3sum", mySum($"c3"))

Даст:

+---+---+---------------+-----+
| id| c2|             c3|c3Sum|
+---+---+---------------+-----+
|  1|  1|[2.0, 1.0, 0.0]|  3.0|
|  2|  2|[0.0, 0.0, 0.0]|  0.0|
|  3|  3|[0.0, 1.0, 0.0]|  1.0|
+---+---+---------------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...