Суммируйте столбцы информационного кадра Spark и создайте другой информационный кадр - PullRequest
1 голос
/ 28 марта 2019

У меня есть кадр данных, как показано ниже -

enter image description here

Я пытаюсь создать другой фрейм данных из этого, который имеет 2 столбца - имя столбца и сумму значений в каждом столбце, как это -

enter image description here

Пока что я пробовал это (в Spark 2.2.0), но выдает трассировку стека -

val get_count: (String => Long) = (c: String) => {
    df.groupBy("id")
      .agg(sum(c) as "s")
      .select("s")
      .collect()(0)
      .getLong(0)
}
val sqlfunc = udf(get_count)

summary = summary.withColumn("sum_of_column", sqlfunc(col("c")))

Есть ли другие варианты выполнения этой задачи?

Ответы [ 2 ]

2 голосов
/ 28 марта 2019

Я думаю, что самый эффективный способ - это выполнить агрегацию, а затем создать новый фрейм данных.Таким образом вы избежите дорогостоящего explode.

Сначала давайте создадим фрейм данных.Кстати, всегда приятно предоставить код, чтобы сделать это, когда вы задаете вопрос.Таким образом, мы можем воспроизвести вашу проблему за считанные секунды.

val df = Seq((1, 1, 0, 0, 1), (1, 1, 5, 0, 0),
             (0, 1, 0, 6, 0), (0, 1, 0, 4, 3))
    .toDF("output_label", "ID", "C1", "C2", "C3")

Затем мы создаем список интересующих нас столбцов, агрегирование и вычисляем результат.

val cols = (1 to 3).map(i => s"C$i")
val aggs = cols.map(name => sum(col(name)).as(name))
val agg_df = df.agg(aggs.head, aggs.tail :_*) // See the note below
agg_df.show
+---+---+---+
| C1| C2| C3|
+---+---+---+
|  5| 10|  4|
+---+---+---+

МыПочти у нас есть все, что нам нужно, нам просто нужно собрать данные и построить новый фрейм данных:

val agg_row = agg_df.first
cols.map(name => name -> agg_row.getAs[Long](name))
    .toDF("column", "sum")
    .show
+------+---+
|column|sum|
+------+---+
|    C1|  5|
|    C2| 10|
|    C3|  4|
+------+---+

РЕДАКТИРОВАТЬ:

NB: df.agg(aggs.head, aggs.tail :_*) может показаться странным.Идея состоит в том, чтобы просто вычислить все агрегаты, вычисленные в aggs.Можно ожидать чего-то более простого, например df.agg(aggs : _*).Тем не менее, подпись метода agg выглядит следующим образом:

def agg(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)

возможно, чтобы гарантировать использование хотя бы одного столбца, и поэтому вам нужно разделить aggs на aggs.head и aggs.tail.

1 голос
/ 28 марта 2019

Что мне нужно сделать, это определить метод для создания структуры из желаемых значений:

  def kv (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
    c => struct(lit(c).alias("k"), col(c).alias("v"))
  }: _*))

Эта функция получает список столбцов для транспонирования (ваши 3 последних столбца в вашем случае) и преобразовывает ихв структуре с именем столбца в качестве ключа и значением столбца в качестве значения

А затем используйте этот метод для создания структуры и обработки ее так, как вы хотите

df.withColumn("kv", kv(df.columns.tail.tail))
.select( $"kv.k".as("column"), $"kv.v".alias("values"))
.groupBy("column")
.agg(sum("values").as("sum"))

Сначала примените предыдущее определенное значениефункция, чтобы иметь желаемые столбцы в качестве указанной структуры, а затем деконструировать структуру, чтобы иметь ключ столбца и значение столбца в каждой строке.Затем вы можете агрегировать по имени столбца и суммировать значения

ВХОД

+------------+---+---+---+---+
|output_label| id| c1| c2| c3|
+------------+---+---+---+---+
|           1|  1|  0|  0|  1|
|           1|  1|  5|  0|  0|
|           0|  1|  0|  6|  0|
|           0|  1|  0|  4|  3|
+------------+---+---+---+---+

ВЫХОД

+------+---+
|column|sum|
+------+---+
|    c1|  5|
|    c3|  4|
|    c2| 10|
+------+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...