Как добавить новый столбец с максимальным значением? - PullRequest
0 голосов
/ 31 мая 2018

У меня есть Dataframe с 2 столбцами tag и value.

Я хочу добавить новый столбец, содержащий столбец max из value.(Это будет одинаковое значение для каждой строки).

Я пытался сделать что-то следующим образом, но это не сработало.

val df2 = df.withColumn("max",max($"value"))

Как добавить столбец maxк набору данных?

Ответы [ 2 ]

0 голосов
/ 31 мая 2018

Есть 3 способа сделать это (один вы уже знаете из другого ответа).Я избегаю collect, так как он на самом деле не нужен.

Вот набор данных с максимальным значением 3, появляющимся дважды.

val tags = Seq(
  ("tg1", 1), ("tg2", 2), ("tg1", 3), ("tg4", 4), ("tg3", 3)
).toDF("tag", "value")
scala> tags.show
+---+-----+
|tag|value|
+---+-----+
|tg1|    1|
|tg2|    2|
|tg1|    3| <-- maximum value
|tg4|    4|
|tg3|    3| <-- another maximum value
+---+-----+

Декартово соединение с набором данных "Max"

Я собираюсь использовать декартово объединение tags и однорядный набор данных с максимальным значением.

val maxDF = tags.select(max("value") as "max")
scala> maxDF.show
+---+
|max|
+---+
|  4|
+---+
val solution = tags.crossJoin(maxDF)
scala> solution.show
+---+-----+---+
|tag|value|max|
+---+-----+---+
|tg1|    1|  4|
|tg2|    2|  4|
|tg1|    3|  4|
|tg4|    4|  4|
|tg3|    3|  4|
+---+-----+---+

Я не беспокоюсь о декартовом объединении здесь, так как это всего лишь одно-row набор данных.

Оконная агрегация

Моя любимая оконная агрегация прекрасно подходит для этой задачи.С другой стороны, я не думаю, что это был бы самый эффективный подход из-за количества используемых разделов, то есть только 1, что дает наихудший параллелизм.

Хитрость заключается в использованиифункция агрегирования max над спецификацией пустого окна, которая сообщает Spark SQL об использовании всех строк в любом порядке.

val solution = tags.withColumn("max", max("value") over ())
scala> solution.show
18/05/31 21:59:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+-----+---+
|tag|value|max|
+---+-----+---+
|tg1|    1|  4|
|tg2|    2|  4|
|tg1|    3|  4|
|tg4|    4|  4|
|tg3|    3|  4|
+---+-----+---+

Обратите внимание на предупреждение, в котором все сказано.

WindowExec: раздел не определен для работы с окном!Перемещение всех данных в один раздел может привести к серьезному снижению производительности.

Я бы не стал использовать этот подход с учетом других решений и оставляю его здесь для образовательных целей.

0 голосов
/ 31 мая 2018

Если вы хотите максимальное значение столбца для всех строк, вам нужно сравнить все строки в некоторой форме.Это означает создание агрегации.withColumn работает только с одной строкой, поэтому вы не можете получить максимальное значение DataFrame.

Самый простой способ сделать это, как показано ниже:

val data = Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4))
val df = sc.parallelize(data).toDF("name", "value")

// first is an action, so this will execute spark stages to compute the value
val maxValue = df.groupBy().agg(max($"value")).first.getInt(0)

// Now you can add it to your original DF
val updatedDF = df.withColumn("max", lit(maxValue))

updatedDF.show

Существует также одна альтернативаэто может быть немного быстрее.Если вам не нужно максимальное значение до конца процесса (после того, как вы уже запустили искровое действие), вы можете вычислить его, написав вместо этого свой собственный Аккумулятор Spark, который собирает значение, выполняя любую другую работу Spark Action, которую вы запросили..

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...