Вы можете действовать следующим образом,
import pyspark.sql.functions as F
from pyspark.sql.types import *
df1 = sql.createDataFrame([
(0,['B', 'C', 'D', 'E']),
(1,['E', 'A', 'C']),
(2,['F', 'A', 'E', 'B']),
(3,['E', 'G', 'A']),
(4,['A', 'C', 'E', 'B', 'D'])],
['id1','items1'])
df2 = sql.createDataFrame([
(001,['A', 'C']),
(002,['D']),
(003,['E', 'A', 'B']),
(004,['B', 'D', 'C']),
(005,['F', 'B']),
(006,['G', 'E'])],
['id2','items2'])
, который дает вам кадры данных,
+---+---------------+
|id1| items1|
+---+---------------+
| 0| [B, C, D, E]|
| 1| [E, A, C]|
| 2| [F, A, E, B]|
| 3| [E, G, A]|
| 4|[A, C, E, B, D]|
+---+---------------+
+---+---------+
|id2| items2|
+---+---------+
| 1| [A, C]|
| 2| [D]|
| 3|[E, A, B]|
| 4|[B, D, C]|
| 5| [F, B]|
| 6| [G, E]|
+---+---------+
Теперь, crossJoin
два кадра данных, которые дают вам декартово произведение df1
с df2
.Затем groupby
на 'items1'
и примените udf
, чтобы получить 'result_array'
.
get_array_udf = F.udf(lambda x,y:[1.0 if set(z) < set(x) else 0.0 for z in y], ArrayType(FloatType()))
df = df1.crossJoin(df2)\
.groupby(['id1', 'items1']).agg(F.collect_list('items2').alias('items2'))\
.withColumn('result_array', get_array_udf('items1', 'items2')).drop('items2')
df.show()
. Это даст вам вывод:
+---+---------------+------------------------------+
|id1|items1 |result_array |
+---+---------------+------------------------------+
|1 |[E, A, C] |[1.0, 0.0, 0.0, 0.0, 0.0, 0.0]|
|0 |[B, C, D, E] |[0.0, 1.0, 0.0, 1.0, 0.0, 0.0]|
|4 |[A, C, E, B, D]|[1.0, 1.0, 1.0, 1.0, 0.0, 0.0]|
|3 |[E, G, A] |[0.0, 0.0, 0.0, 0.0, 0.0, 1.0]|
|2 |[F, A, E, B] |[0.0, 0.0, 1.0, 0.0, 1.0, 0.0]|
+---+---------------+------------------------------+