Группировка строк в Spark - PullRequest
       7

Группировка строк в Spark

1 голос
/ 26 октября 2019

Я использую Spark 2.4.0 и хотел бы узнать, как решить следующую проблему с помощью Spark:

Каждая запись в нижеприведенных фреймах данных может иметь два разных формата с предоставленными идентификаторами. Я хотел бы сгруппировать их, если две записи имеют один и тот же идентификатор (либо в формате 1, либо в формате 2), и назначить идентификатор группы для каждой из групп.

например,

Фрейм входных данных:

-------------------------------
Format1 Id    | Format2 Id    |
-------------------------------
Format1_1     |   Format2_1   |
Format1_2     |   Format2_1   |
Format1_3     |   Format2_1   |
Format1_4     |   Format2_2   |

Фрейм выходных данных:

-------------------------------------------  
Format1 Id    | Format2 Id    | Group Id   |
-------------------------------------------
Format1_1     |   Format2_1   |  1         |
Format1_2     |   Format2_1   |  1         |
Format1_3     |   Format2_1   |  1         |
Format1_4     |   Format2_2   |  2         |

Поскольку первые 3 записи имеют один и тот же идентификатор в формате 2, они сгруппированы вместе и назначены один и тот же идентификатор группы.

Последняя запись не связана с драгоценными тремя записями. Он рассматривается как одна группа.

Я пытался использовать HashMap (String, Int) для сопоставления каждого идентификатора формата с соответствующим идентификатором группы, но, поскольку этот HashMap не распределен по узлам, другие рабочие узлыне удалось прочитать ранее вставленные значения карты.

Я новичок в Spark и хотел бы знать, как я могу реализовать эту проблему группировки в Spark?

Ответы [ 2 ]

1 голос
/ 26 октября 2019

Это можно рассматривать как проблему графа, где каждый идентификатор представляет собой узел, а пара идентификаторов представляет ребра. Таким образом, запрос заключается в том, чтобы найти все подключенные компоненты и добавить идентификатор компонента обратно в исходный кадр данных.

import org.graphframes._ //execute: spark-shell --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._

case class Data(id1: Int, id2: Int) 
val data = Seq(Data(1,2), Data(1,3), Data(4,3), Data(4,5)) //sample dataset for testing
val df = data.toDF()

+---+---+
|id1|id2|
+---+---+
|  1|  2|
|  1|  3|
|  4|  3|
|  4|  5|
+---+---+

val nodes = df.select(concat(lit("id1_"), col("id1")).alias("id")).distinct.union(df.select(concat(lit("id2_"), col("id2")).alias("id")).distinct)
val edges = df.select(concat(lit("id1_"), col("id1")).alias("src"), concat(lit("id2_"), col("id2")).alias("dst"))
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
val g = GraphFrame(nodes, edges)
val comDf = g.connectedComponents.run()

val result = df.withColumn("id", concat(lit("id1_"), col("id1"))).join(comDf, Seq("id"), "left_outer").select("id1","id2","component")

+---+---+------------+
|id1|id2|   component|
+---+---+------------+
|  1|  2|154618822656|
|  1|  3|154618822656|
|  4|  3|154618822656|
|  4|  5|154618822656|
+---+---+------------+
0 голосов
/ 26 октября 2019

Вы можете использовать функцию dense_rank().

scala> df.show()
+---------+---------+
|      _c0|      _c1|
+---------+---------+
|Format1_1|Format2_1|
|Format1_2|Format2_1|
|Format1_3|Format2_1|
|Format1_4|Format2_2|
+---------+---------+

// dataframe api
scala> df.withColumn("group_id",dense_rank().over(Window.orderBy('_c1))).show()

+---------+---------+--------+
|      _c0|      _c1|group_id|
+---------+---------+--------+
|Format1_1|Format2_1|       1|
|Format1_2|Format2_1|       1|
|Format1_3|Format2_1|       1|
|Format1_4|Format2_2|       2|
+---------+---------+--------+

// sql
scala> spark.sql("select df.*, dense_rank() over (order by _c1) as group_id from df").show()
...