вы можете использовать оператор cogroup в сочетании с определенной пользователем функцией для создания различных вариантов объединения. Предположим, у нас есть два этих RDD в качестве примера:
visits = sc.parallelize([("h", "1.2.3.4"), ("a", "3.4.5.6"), ("h","1.3.3.1")] )
pageNames = sc.parallelize([("h", "Home"), ("a", "About"), ("o", "Other")])
cg = visits.cogroup(pageNames).map(lambda x :(x[0], ( list(x[1][0]), list(x[1][1]))))
Вы можете реализовать внутреннее объединение следующим образом:
innerjoin = cg.flatMap(lambda x: J(x))
Где J определен как таковой:
def J(x):
j=[]
k=x[0]
if x[1][0]!=[] and x[1][1]!=[]:
for l in x[1][0]:
for r in x[1][1]:
j.append((k,(l,r)))
return j
Для правого внешнего соединения, например, вам просто нужно изменить функцию J на функцию roJ, определенную так:
def roJ(x):
j=[]
k=x[0]
if x[1][0]!=[] and x[1][1]!=[]:
for l in x[1][0]:
for r in x[1][1]:
j.append((k,(l,r)))
elif x[1][1]!=[] :
for r in x[1][1]:
j.append((k, (None, r)))
return j
И вызвать ее так:
rightouterjoin = cg.flatMap(lambda x: roJ(x))
И так для других типов объединения, вы бы sh реализовали