Spark - агрегирование и суммирование дочерних элементов с родительскими записями - PullRequest
0 голосов
/ 19 июня 2020

Я работаю с данными, имеющими древовидную структуру. У каждого родителя может быть несколько детей. Родитель не имеет информации о потомках, но каждый ребенок знает своего родителя. Кроме того, каждый дочерний элемент знает свой полный путь - строку сцепленных родительских идентификаторов, поэтому каждая запись знает свой уровень в дереве. Эта запись имеет структуру:

id  | parent_id | path
--- + --------- + ------
11  | 1         | 1-11
12  | 1         | 1-12
121 | 12        | 1-12-121

И теперь мне нужно прочитать таблицу, сгруппировать по идентификатору и просуммировать столбец value типа bigint. И наиболее важным фактом является то, что только листы - элементы без дочерних элементов - имеют указанное значение, и каждый родительский элемент должен быть суммой всех своих дочерних значений. Изначально все родители имеют значение 0.

Перед группировкой:

Root
| - Parent 1 (value = 0)
| - - Child 11 (value = 1)
| - - Child 12 (value = 1)
| - - Parent 13 (value = 0)
| - - - Child 131 (value = 2)
| - - - Child 132 (value = 1)
| - Parent 2 (value = 0)
| - - Child 21 (value = 2)
| - - Child 22 (value = 1)

Результат группировки:

Root
| - Parent 1 (value = 5 (1 + 1 + 3))
| - - Child 11 (value = 1)
| - - Child 12 (value = 1)
| - - Parent 13 (value = 3 (2 + 1))
| - - - Child 131 (value = 2)
| - - - Child 132 (value = 1)
| - Parent 2 (value = 3 (2 + 1))
| - - Child 21 (value = 2)
| - - Child 22 (value = 1)

И действительно важное требование: я не могу это собрать данные и сгруппировать в памяти, потому что набор данных действительно огромен, поэтому мне приходится делать это с использованием набора данных или фрейма данных.

1 Ответ

1 голос
/ 20 июня 2020

Если я правильно понял, вас интересует только сумма значений для каждого узла. В этом случае вам просто нужно смотреть каждый раз, когда узел появляется на одном из путей, и добавлять все такие значения для соответствующего узла. версия с искрой будет:

scala> val df = spark.sql(s"""
  select
    col1 as id,
    col2 as parent_id,
    col3 as path,
    col4 as value
  from values
    (11, 1, "1-11", 1),
    (12, 1, "1-12", 1),
    (13, 1, "1-13", 0),
    (131, 13, "1-13-131", 2),
    (132, 13, "1-13-132", 1)
""")


scala> (df
 .withColumn("path_arr", split(col("path"), "-"))
 .select($"value", explode($"path_arr").as("node"))
 .groupBy("node")
 .sum()
 .orderBy($"node")
).show

, что дает:

+----+----------+
|node|sum(value)|
+----+----------+
|   1|         5|
|  11|         1|
|  12|         1|
|  13|         3|
| 131|         2|
| 132|         1|
+----+----------+
...