Объединение двух RDD с несколькими компонентами стоимости и выравнивание результата - PullRequest
0 голосов
/ 05 декабря 2018

У меня есть 2 RDD с одним и тем же ключом, но разными типами значений (более 2-х значений).Я хочу присоединиться к этим СДР по ключу и добавить их значения в последнем кортеже (см. Ниже).Какой лучший способ сделать это?

rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])

Требуемый выходной СДР

[ (1, "Foo", "test1", [5,6,7]), (2, "Bar", "test2", [1,2,3]) ]

Выполнение прямого объединения не работает:

print(rdd2.join(rdd1).collect())
#[(1, ('Foo', 'test1')), (2, ('Bar', 'test2'))]

При этом игнорируются остальные значения в rdd1, и выходные данные имеют неправильный формат.

1 Ответ

0 голосов
/ 05 декабря 2018

Вы можете использовать join здесь, при условии, что вы сначала отобразите rdds в форму (key, value).

rdd1 = sc.parallelize([ (1, "test1", [5,6,7]), (2, "test2", [1,2,3]) ])
rdd2 = sc.parallelize([ (1, "Foo"), (2, "Bar") ])

def map_to_kvp(row):
    if len(row) < 3:
        return row
    return (row[0], tuple(row[1:]))

rdd3 = rdd2.map(map_to_kvp).join(rdd1.map(map_to_kvp))
print(rdd3.collect())
#[
#    (1, ('Foo', ('test1', [5, 6, 7]))), 
#    (2, ('Bar', ('test2', [1, 2, 3])))
#]

Теперь у вас есть вседанные в нужных местах, но вам просто нужно сгладить результирующие строки.

В этом случае вам придется написать собственную функцию flatten, чтобы избежать сглаживания также string и list.

Мы можем опираться на этот ответ до Как можно сгладить списки без разделения строк? , чтобы создать собственную функцию:

def flatten(foo):
    for x in foo:
        if hasattr(x, '__iter__') and not isinstance(x, str) and not isinstance(x, list):
            for y in flatten(x):
                yield y
        else:
            yield x

rdd4 = rdd3.map(lambda row: tuple(flatten(row)))
print(rdd4.collect())
#[(1, 'Foo', 'test1', [5, 6, 7]), (2, 'Bar', 'test2', [1, 2, 3])]
...