Сбор уникальных элементов во время агрегации Spark - PullRequest
0 голосов
/ 18 марта 2019

Задача

Мне нужно обновить эту строку в моем коде. Как я могу это сделать?

"case StringType => concat_ws(",",collect_list(col(c)))" 

Добавлять только те строки, которых еще нет в существующем поле. В этом примере буква «b» не будет появляться дважды.

код

val df =Seq(
     (1, 1.0, true, "a"),
     (2, 2.0, false, "b")
     (3, 2.0, false, "b")
     (3, 2.0, false, "c")
).toDF("id","d","b","s")

val dataTypes: Map[String, DataType] = df.schema.map(sf => 
(sf.name,sf.dataType)).toMap

def genericAgg(c:String) = {
  dataTypes(c) match {
  case DoubleType => sum(col(c))
  case StringType => concat_ws(",",collect_list(col(c)))
  case BooleanType => max(col(c))
  }
}

val aggExprs: Seq[Column] = df.columns.filterNot(_=="id")
.map(c => genericAgg(c))

df
  .groupBy("id")
  .agg(
    aggExprs.head,aggExprs.tail:_*
  )
  .show()

1 Ответ

1 голос
/ 18 марта 2019

Вы, вероятно, хотите использовать collect_set() вместо collect_list().Это автоматически удалит дубликаты во время сбора.

Я не уверен, почему вы хотите превратить массив уникальных строк в список, разделенный запятыми.Spark может легко обрабатывать столбцы массива, и они отображаются так, что каждый элемент можно увидеть.Тем не менее, если вам абсолютно необходимо преобразовать массив в строку с разделителями-запятыми, используйте array_join в Spark 2.4+ или UDF в более ранних версиях Spark.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...