Как сгруппировать и объединить список в Dataframe Spark Scala - PullRequest
0 голосов
/ 08 мая 2018

У меня есть датафрейм с двумя столбцами с данными, как показано ниже

+----+-----------------+
|acct|           device|
+----+-----------------+
|   B|       List(3, 4)|
|   C|       List(3, 5)|
|   A|       List(2, 6)|
|   B|List(3, 11, 4, 9)|
|   C|       List(5, 6)|
|   A|List(2, 10, 7, 6)|
+----+-----------------+

И мне нужен результат, как показано ниже

+----+-----------------+
|acct|           device|
+----+-----------------+
|   B|List(3, 4, 11, 9)|
|   C|    List(3, 5, 6)|
|   A|List(2, 6, 7, 10)|
+----+-----------------+

Я попытался, как показано ниже, но, похоже, не работает

df.groupBy("acct").agg(concat("device"))

df.groupBy("acct").agg(collect_set("device"))

Пожалуйста, дайте мне знать, как мне добиться этого с помощью Scala?

Ответы [ 3 ]

0 голосов
/ 08 мая 2018

Вы можете попробовать использовать collect_set и Window. В вашем случае:

df.withColumn("device", collect_set("device").over(Window.partitionBy("acct")))
0 голосов
/ 08 мая 2018

Другая опция, которая может работать лучше, чем опция explode: создание собственной UserDefinedAggregationFunction , которая объединяет списки в отдельные наборы.

Вам нужно будет продлить UserDefinedAggregateFunction следующим образом:

class MergeListsUDAF extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(Seq(StructField("a", ArrayType(IntegerType))))

  override def bufferSchema: StructType = inputSchema

  override def dataType: DataType = ArrayType(IntegerType)

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, mutable.Seq[Int]())

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val existing = buffer.getAs[mutable.Seq[Int]](0)
    val newList = input.getAs[mutable.Seq[Int]](0)
    val result = (existing ++ newList).distinct
    buffer.update(0, result)
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)

  override def evaluate(buffer: Row): Any = buffer.getAs[mutable.Seq[Int]](0)
}

И используйте это так:

val mergeUDAF = new MergeListsUDAF()

df.groupBy("acct").agg(mergeUDAF($"device"))
0 голосов
/ 08 мая 2018

Вы можете начать со взрыва столбца device и продолжить, как и раньше, но учтите, что он может не сохранить порядок списков (что в любом случае не гарантируется ни в одной группе):

val result = df.withColumn("device", explode($"device"))
  .groupBy("acct")
  .agg(collect_set("device"))

result.show(truncate = false)
// +----+-------------------+
// |acct|collect_set(device)|
// +----+-------------------+
// |B   |[9, 3, 4, 11]      |
// |C   |[5, 6, 3]          |
// |A   |[2, 6, 10, 7]      |
// +----+-------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...