Вы можете использовать join
здесь, при условии, что вы сначала отобразите rdds
в форму (key, value)
.
rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])
def map_to_kvp(row):
if len(row) < 3:
return row
return (row[0], tuple(row[1:]))
rdd3 = rdd2.map(map_to_kvp).join(rdd1.map(map_to_kvp))
print(rdd3.collect())
#[
# (1, ('Foo', ('test1', [5, 6, 7]))),
# (2, ('Bar', ('test2', [1, 2, 3])))
#]
Теперь у вас есть вседанные в нужных местах, но вам просто нужно сгладить результирующие строки.
В этом случае вам придется написать собственную функцию flatten
, чтобы избежать сглаживания также string
и list
.
Мы можем опираться на этот ответ до Как можно сгладить списки без разделения строк? , чтобы создать собственную функцию:
def flatten(foo):
for x in foo:
if hasattr(x, '__iter__') and not isinstance(x, str) and not isinstance(x, list):
for y in flatten(x):
yield y
else:
yield x
rdd4 = rdd3.map(lambda row: tuple(flatten(row)))
print(rdd4.collect())
#[(1, 'Foo', 'test1', [5, 6, 7]), (2, 'Bar', 'test2', [1, 2, 3])]