Добавление постоянного значения в каждый раздел с помощью Spark Scala - PullRequest
2 голосов
/ 01 мая 2019

Я пытаюсь добавить идентификатор для каждой группы дат, используя Spark Scala.

Например, если ввод был:

date
2019-01-29
2019-01-29
2019-07-31
2019-01-29
2019-07-31

Вывод будет:

id, date
ABC1, 2019-01-29
ABC1, 2019-01-29
ABC1, 2019-01-29
ABC2, 2019-07-31
ABC2, 2019-07-31

Может ли кто-нибудь помочь мне с этим?

Мне удалось добавить последовательные номера строк для каждого раздела, но я хотел бы получить постоянное значение для каждого раздела.

df.withColumn(lineNumColName, row_number().over(Window.partitionBy(partitionByCol).orderBy(orderByCol))).repartition(1).orderBy(orderByCol, lineNumColName)

1 Ответ

2 голосов
/ 01 мая 2019

Вариант 1 (маленький набор данных):

Если ваш набор данных не слишком большой, вы можете использовать Window и dens_rank, как показано ниже:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{concat,lit, dense_rank}

val df = Seq(("2019-01-29"),
("2019-01-29"),
("2019-07-31"),
("2019-01-29"),
("2019-07-31")).toDF("date")

val w = Window.orderBy($"date") 
val d_rank = dense_rank().over(w)
df.withColumn("id",  concat(lit("ABC"), d_rank)).show(false)

Вывод:

+----------+----+
|date      |id  |
+----------+----+
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-07-31|ABC2|
|2019-07-31|ABC2|
+----------+----+

Поскольку мы не указываем никакого значения для части partitionBy, будет использоваться только один раздел, и поэтому он будет очень неэффективным.

Вариант 2 (большой набор данных):

Более эффективный подход заключается в назначении идентификаторов для большого набора данных с использованием функции zipWithIndex:

val df_d = df.distinct.rdd.zipWithIndex().map{ r => (r._1.getString(0), r._2 + 1) }.toDF("date", "id")
df_d.show

// Output:
+----------+---+
|      date| id|
+----------+---+
|2019-01-29|  1|
|2019-07-31|  2|
+----------+---+

Сначала мы получаем уникальное значение кадра данных с помощью distinct, затем мы вызываем zipWithIndex, чтобы создать уникальный идентификатор для каждой записи даты.

Наконец, мы объединяем два набора данных:

df.join(df_d, Seq("date"))
.withColumn("id",  concat(lit("ABC"), $"id"))
.show

// Output:
+----------+----+
|      date|  id|
+----------+----+
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-01-29|ABC1|
|2019-07-31|ABC2|
|2019-07-31|ABC2|
+----------+----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...