Первое, что вы хотите сделать, это explode
значения в df2.items2
, чтобы содержимое массивов находилось в отдельных строках:
from pyspark.sql.functions import explode
df2 = df2.select(explode("items2").alias("items2"))
df2.show()
#+------+
#|items2|
#+------+
#| B|
#| A|
#| C|
#| E|
#+------+
(Предполагается, чточто значения в df2.items2
различны - если нет, вам необходимо добавить df2 = df2.distinct()
.)
Опция 1 : использовать crossJoin
:
СейчасВы можете crossJoin
новый df2
вернуться к df1
и сохранить только те строки, где df1.items1
содержит элемент в df2.items2
.Мы можем достичь этого, используя pyspark.sql.functions.array_contains
и этот трюк , который позволяет нам использовать значение столбца в качестве параметра .
После фильтрации сгруппируйте по id1
и items1
и агрегируйте, используя pyspark.sql.functions.collect_list
from pyspark.sql.functions import expr, collect_list
df1.alias("l").crossJoin(df2.alias("r"))\
.where(expr("array_contains(l.items1, r.items2)"))\
.groupBy("l.id1", "l.items1")\
.agg(collect_list("r.items2").alias("items1_updated"))\
.show()
#+---+---------------+--------------+
#|id1| items1|items1_updated|
#+---+---------------+--------------+
#| 1| [E, A, C]| [A, C, E]|
#| 0| [B, C, D, E]| [B, C, E]|
#| 4|[A, C, E, B, D]| [B, A, C, E]|
#| 3| [E, G, A]| [A, E]|
#| 2| [F, A, E, B]| [B, A, E]|
#+---+---------------+--------------+
Опция 2 : Разнесение df1.items1
и левое соединение:
Другой вариант - explode
содержимое items1
в df1
и сделать левое соединение.После объединения мы должны сделать аналогичную группировку и агрегирование, как указано выше.Это работает, потому что collect_list
будет игнорировать значения null
, введенные несоответствующими строками
df1.withColumn("items1", explode("items1")).alias("l")\
.join(df2.alias("r"), on=expr("l.items1=r.items2"), how="left")\
.groupBy("l.id1")\
.agg(
collect_list("l.items1").alias("items1"),
collect_list("r.items2").alias("items1_updated")
).show()
#+---+---------------+--------------+
#|id1| items1|items1_updated|
#+---+---------------+--------------+
#| 0| [E, B, D, C]| [E, B, C]|
#| 1| [E, C, A]| [E, C, A]|
#| 3| [E, A, G]| [E, A]|
#| 2| [F, E, B, A]| [E, B, A]|
#| 4|[E, B, D, C, A]| [E, B, C, A]|
#+---+---------------+--------------+