Группировка в фрейме Apache Spark - PullRequest
1 голос
/ 11 мая 2019

Я новичок в Apache Spark.Ниже мой Spark dataframe, который создается, когда я читаю CSV-файл.

Parent  Keyword   Volume
P1       K1        100
P1       K2        200
P1       K3        150
P2       K4        100
P2       K5        200

Мне нужно преобразовать приведенный выше кадр данных в следующий.Логика заключается в том, что все ключевые слова, принадлежащие одному и тому же родителю, связаны между собой и должны быть перечислены в отсортированном порядке по объему.Например, K1, K2, K3 принадлежат одному и тому же родителю P1, поэтому они все связаны.Так, для K1, ключевыми словами являются K2 и K3.K2 отображается первым, потому что его громкость (200) превышает K3(150).

Keyword   Related_keywords
K1         K2, K3
K2         K3, K1
K3         K2, K1
K4         K5
K5         K4

Может кто-нибудь, пожалуйста, помогите мне.Я новичок в Spark и, глядя на эту проблему, можно использовать groupBy, но не знаю точно, как превратить первый кадр данных во второй.

1 Ответ

1 голос
/ 11 мая 2019

Хотя это можно сделать с помощью groupBy, оконные функции обычно проще, когда вам нужны все исходные строки во фрейме результирующих данных. Мы можем использовать collect_list, но, как говорит документ , порядок является недетерминированным, поэтому давайте создадим кортежи томов и ключевых слов:

val txt =
  """Parent  Keyword   Volume
    |P1       K1        100
    |P1       K2        200
    |P1       K3        150
    |P2       K4        100
    |P2       K5        200""".stripMargin.lines
    .map(_.split("\\s+").mkString("|"))
    .toSeq
    .toDS()
val df = spark.read
  .option("inferSchema", true)
  .option("header", true)
  .option("delimiter", "|")
  .csv(txt)

val win = Window.partitionBy($"Parent")
val df1 =
  df.select($"Keyword",
            collect_list(struct(-$"Volume", $"Keyword")).over(win) as "rel")

Теперь у нас почти есть нужный формат

df1.select(array_sort($"rel") as "Related_keywords")
  .show(20, false)

Выход:

+------------------------------------+
|Related_keywords                    |
+------------------------------------+
|[[-200, K5], [-100, K4]]            |
|[[-200, K5], [-100, K4]]            |
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
|[[-200, K2], [-150, K3], [-100, K1]]|
+------------------------------------+

Однако есть две проблемы, оригинальная Keyword будет дублирована в списке, и перед всеми ключевыми словами будет отрицательный объем. Чтобы сделать это красивее, я считаю, что нужны UDF: s (не удалось найти функцию SQL для распаковки кортежей):

val myudf = udf(
  (keyword: String, rel: Seq[Row]) =>
    rel
      .collect {
        case Row(volume: Int, kw: String) if kw != keyword => (volume, kw)
      }
      .sortBy(_._1)
      .map(_._2))

df1.select($"Keyword", myudf($"Keyword", $"rel") as "Related_keywords")
  .show(20, false)

Выход:

+-------+----------------+
|Keyword|Related_keywords|
+-------+----------------+
|K4     |[K5]            |
|K5     |[K4]            |
|K1     |[K2, K3]        |
|K2     |[K3, K1]        |
|K3     |[K2, K1]        |
+-------+----------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...