Как получить сопоставленные значения в Pyspark? - PullRequest
3 голосов
/ 17 июня 2020

У меня есть фреймворк, который я использую для всех моих вычислений, в котором есть столбец id и столбец имени

id | name
1  | Alex
2  | Bob
3  | Chris
4  | Kevin

Я выполняю кучу операций и получаю их ближайших друзей, которые представляют собой список пар в форме [id, score]

id | friends
1  | [[2, 49], [3, 15]]
2  | [[4, 61], [2, 49], [3, 4]]

Как я могу сопоставить этот список друзей со списком имен? Очки теперь можно сбросить. В идеале это будет выглядеть как

id | friends
1  | [Bob, Chris]
2  | [Kevin, Bob, Chris]

Я думал о каком-то соединении, но не понимаю, как это будет работать, так как это список

Ответы [ 2 ]

2 голосов
/ 17 июня 2020

Возможно, это полезно (записано в scala)

Добавлены оба параметра, если фрейм данных измерения представляет малый и большой набор данных

1. Загрузите оба фрейма данных

 val data1 =
      """
        |id | name
        |1  | Alex
        |2  | Bob
        |3  | Chris
        |4  | Kevin
      """.stripMargin

    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +---+-----+
      * |id |name |
      * +---+-----+
      * |1  |Alex |
      * |2  |Bob  |
      * |3  |Chris|
      * |4  |Kevin|
      * +---+-----+
      *
      * root
      * |-- id: integer (nullable = true)
      * |-- name: string (nullable = true)
      */

    val df2 =
      spark.sql(
        """
          |select id, friends from values
          | (1, array(named_struct('id', 2, 'score', 49), named_struct('id', 3, 'score', 15))),
          | (2, array(named_struct('id', 4, 'score', 61), named_struct('id', 2, 'score', 49), named_struct('id', 3,
          | 'score', 4)))
          | T(id, friends)
        """.stripMargin)
    df2.show(false)
    df2.printSchema()
    /**
      * +---+--------------------------+
      * |id |friends                   |
      * +---+--------------------------+
      * |1  |[[2, 49], [3, 15]]        |
      * |2  |[[4, 61], [2, 49], [3, 4]]|
      * +---+--------------------------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- friends: array (nullable = false)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- id: integer (nullable = false)
      * |    |    |-- score: integer (nullable = false)
      */

2. Если размерные данные большие


    // if df1 has big data
    val exploded = df2.select($"id", explode(expr("friends.id")).as("friend_id"))
      exploded.join(df1, exploded("friend_id")===df1("id"))
      .groupBy(exploded("id"))
      .agg(collect_list($"name").as("friends"))
      .show(false)
    /**
      * +---+-------------------+
      * |id |friends            |
      * +---+-------------------+
      * |2  |[Bob, Chris, Kevin]|
      * |1  |[Bob, Chris]       |
      * +---+-------------------+
      */

3. Если размер фрейма данных маленький

    // if df1 is small
    val b = spark.sparkContext.broadcast(df1.collect().map{case Row(id: Int, name: String) => id -> name}.toMap)

    val getFriendsName = udf((idArray: mutable.WrappedArray[Int]) => idArray.map(b.value(_)))

    df2.withColumn("friends", getFriendsName(expr("friends.id")))
      .show(false)

    /**
      * +---+-------------------+
      * |id |friends            |
      * +---+-------------------+
      * |1  |[Bob, Chris]       |
      * |2  |[Kevin, Bob, Chris]|
      * +---+-------------------+
      */
1 голос
/ 17 июня 2020

Посмотрите на это

df = spark.createDataFrame([('1' , 'Alex'), ('2' , 'Bob'), ('3' , 'Chris'), ('4' , 'Kevin')],['id' , 'name'])
df2 = spark.createDataFrame([('1',[[2, 49], [3, 15]]), ('2',[[4, 61], [2, 49], [3, 4]])], ['id' , 'friends'])

df3 = df2.select('id','friends' ,f.expr('''explode(transform(friends,x->x[0])) as friend'''))

df3.join(df,df.id.cast('int')==df3.friend.cast('int')).groupBy(df3.id,df3.friends).agg(f.collect_list('name').alias('friend')).show()

+---+--------------------+-------------------+
| id|             friends|             friend|
+---+--------------------+-------------------+
|  1|  [[2, 49], [3, 15]]|       [Chris, Bob]|
|  2|[[4, 61], [2, 49]...|[Chris, Kevin, Bob]|
+---+--------------------+-------------------+
...