Исключительная реализация объединения может обрабатывать столбцы массива без проблем.Единственная проблема заключается в том, что он не игнорирует порядок вашего столбца.Поэтому перед тем, как правильно объединить, необходимо отсортировать столбец объединения.Для этого вы можете использовать функцию sort_array .
from pyspark.sql import functions as F
df1 = spark.createDataFrame(
[
( [1828545, 1242385], 4),
( [1828545, 2032007], 4),
( [1137808], 11),
( [1209448], 5),
( [21002], 5),
( [2793224], 209),
( [2793224, 8590], 7),
([2793224, 8590, 81], 4),
([2793224, 8590, 82], 4),
([2793224, 8590, 83], 5),
([2793224, 8590, 11], 4),
( [2793224, 2593971], 20)
], ['items','freq'])
df2 = spark.createDataFrame(
[
('WLB2T1JWGTHH','0012c5936056e',[1828545, 1242385] ),
('BZTAWYQ70C7N','00783934ea027',[2793224, 8590] ),
('42L1RJL436ST','00c6821ed171e',[8590, 2793224] ),
('HB348HWSJAOP','00fa9607ead50',[21002] ),
('I9FOENUQL1F1','013f69b45bb58',[21002] )
], ['user_id', 'session_id', 'itemset'])
df1 = df1.withColumn('items', F.sort_array('items'))
df2 = df2.withColumnRenamed('itemset', 'items').withColumn('items', F.sort_array('items'))
df1.join(df2, "items").show()
Вывод:
+------------------+----+------------+-------------+
| items|freq| user_id| session_id|
+------------------+----+------------+-------------+
| [8590, 2793224]| 7|BZTAWYQ70C7N|00783934ea027|
| [8590, 2793224]| 7|42L1RJL436ST|00c6821ed171e|
|[1242385, 1828545]| 4|WLB2T1JWGTHH|0012c5936056e|
| [21002]| 5|HB348HWSJAOP|00fa9607ead50|
| [21002]| 5|I9FOENUQL1F1|013f69b45bb58|
+------------------+----+------------+-------------+