Spark: объединить два кадра данных в столбце типа массива - PullRequest
0 голосов
/ 20 апреля 2020

У меня есть простой вариант использования. У меня есть два фрейма данных df1 и df2, и я ищу эффективный способ присоединения к ним?

df1: содержит мой основной фрейм данных (миллиарды записей)

+--------+-----------+--------------+
|doc_id  |doc_name   |doc_type_id   |
+--------+-----------+--------------+
|   1    |doc_name_1 |[1,4]         |
|   2    |doc_name_2 |[3,2,6]       |
+--------+-----------+--------------+

df2: Содержит метки типов do c (40000 записей), поскольку я передаю их в небольшом количестве.

+------------+----------------+
|doc_type_id |doc_type_name   |
+------------+----------------+
|   1        |doc_type_1      |
|   2        |doc_type_2      |
|   3        |doc_type_3      |
|   4        |doc_type_4      |
|   5        |doc_type_5      |
|   6        |doc_type_5      |
+------------+----------------+

Я хотел бы объединить эти два кадра данных, чтобы получить что-то вроде этого:

+--------+------------+--------------+----------------------------------------+
|doc_id  |doc_name    |doc_type_id   |doc_type_name                           |
+--------+------------+--------------+----------------------------------------+
|   1    |doc_name_1  |[1,4]         |["doc_type_1","doc_type_4"]             |
|   2    |doc_name_2  |[3,2,6]       |["doc_type_3","doc_type_2","doc_type_6"]|
+--------+------------+--------------+----------------------------------------+

Спасибо

1 Ответ

1 голос
/ 20 апреля 2020

Мы можем использовать array_contains + groupBy + collect_list функции для этого случая.

Пример:

val df1=Seq(("1","doc_name_1",Seq(1,4)),("2","doc_name_2",Seq(3,2,6))).toDF("doc_id","doc_name","doc_type_id")

val df2=Seq(("1","doc_type_1"),("2","doc_type_2"),("3","doc_type_3"),("4","doc_type_4"),("5","doc_type_5"),("6","doc_type_6")).toDF("doc_type_id","doc_type_name")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

df1.createOrReplaceTempView("tbl")
df2.createOrReplaceTempView("tbl2")

spark.sql("select a.doc_id,a.doc_name,a.doc_type_id,collect_list(b.doc_type_name) doc_type_name from tbl a join tbl2 b on array_contains(a.doc_type_id,int(b.doc_type_id)) = TRUE group by a.doc_id,a.doc_name,a.doc_type_id").show(false)

//+------+----------+-----------+------------------------------------+
//|doc_id|doc_name  |doc_type_id|doc_type_name                       |
//+------+----------+-----------+------------------------------------+
//|2     |doc_name_2|[3, 2, 6]  |[doc_type_2, doc_type_3, doc_type_6]|
//|1     |doc_name_1|[1, 4]     |[doc_type_1, doc_type_4]            |
//+------+----------+-----------+------------------------------------+

Другой способ достичь можно с помощью explode + join + collect_list:

val df3=df1.withColumn("arr",explode(col("doc_type_id")))

df3.join(df2,df2.col("doc_type_id") === df3.col("arr"),"inner").
groupBy(df3.col("doc_id"),df3.col("doc_type_id"),df3.col("doc_name")).
agg(collect_list(df2.col("doc_type_name")).alias("doc_type_name")).
show(false)

//+------+-----------+----------+------------------------------------+
//|doc_id|doc_type_id|doc_name  |doc_type_name                       |
//+------+-----------+----------+------------------------------------+
//|1     |[1, 4]     |doc_name_1|[doc_type_1, doc_type_4]            |
//|2     |[3, 2, 6]  |doc_name_2|[doc_type_2, doc_type_3, doc_type_6]|
//+------+-----------+----------+------------------------------------+
...