Возможно, это полезно (записано в scala)
Добавлены оба параметра, если фрейм данных измерения представляет малый и большой набор данных
1. Загрузите оба фрейма данных
val data1 =
"""
|id | name
|1 | Alex
|2 | Bob
|3 | Chris
|4 | Kevin
""".stripMargin
val stringDS1 = data1.split(System.lineSeparator())
.map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df1 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS1)
df1.show(false)
df1.printSchema()
/**
* +---+-----+
* |id |name |
* +---+-----+
* |1 |Alex |
* |2 |Bob |
* |3 |Chris|
* |4 |Kevin|
* +---+-----+
*
* root
* |-- id: integer (nullable = true)
* |-- name: string (nullable = true)
*/
val df2 =
spark.sql(
"""
|select id, friends from values
| (1, array(named_struct('id', 2, 'score', 49), named_struct('id', 3, 'score', 15))),
| (2, array(named_struct('id', 4, 'score', 61), named_struct('id', 2, 'score', 49), named_struct('id', 3,
| 'score', 4)))
| T(id, friends)
""".stripMargin)
df2.show(false)
df2.printSchema()
/**
* +---+--------------------------+
* |id |friends |
* +---+--------------------------+
* |1 |[[2, 49], [3, 15]] |
* |2 |[[4, 61], [2, 49], [3, 4]]|
* +---+--------------------------+
*
* root
* |-- id: integer (nullable = false)
* |-- friends: array (nullable = false)
* | |-- element: struct (containsNull = false)
* | | |-- id: integer (nullable = false)
* | | |-- score: integer (nullable = false)
*/
2. Если размерные данные большие
// if df1 has big data
val exploded = df2.select($"id", explode(expr("friends.id")).as("friend_id"))
exploded.join(df1, exploded("friend_id")===df1("id"))
.groupBy(exploded("id"))
.agg(collect_list($"name").as("friends"))
.show(false)
/**
* +---+-------------------+
* |id |friends |
* +---+-------------------+
* |2 |[Bob, Chris, Kevin]|
* |1 |[Bob, Chris] |
* +---+-------------------+
*/
3. Если размер фрейма данных маленький
// if df1 is small
val b = spark.sparkContext.broadcast(df1.collect().map{case Row(id: Int, name: String) => id -> name}.toMap)
val getFriendsName = udf((idArray: mutable.WrappedArray[Int]) => idArray.map(b.value(_)))
df2.withColumn("friends", getFriendsName(expr("friends.id")))
.show(false)
/**
* +---+-------------------+
* |id |friends |
* +---+-------------------+
* |1 |[Bob, Chris] |
* |2 |[Kevin, Bob, Chris]|
* +---+-------------------+
*/