Писпарк |карта JSON RDD и применить трансляцию - PullRequest
0 голосов
/ 05 декабря 2018

В pyspark, как преобразовать входной RDD с JSON в указанный ниже вывод при применении широковещательной переменной к списку значений?

Ввод

[{'id': 1, 'title': "Foo", 'items': ['a','b','c']}, {'id': 2, 'title': "Bar", 'items': ['a','b','d']}]

Вещательная переменная

[('a': 5), ('b': 12), ('c': 42), ('d': 29)]

Желаемый выход

[(1, 'Foo', [5, 12, 42]), (2, 'Bar', [5, 12, 29])]

1 Ответ

0 голосов
/ 05 декабря 2018

Редактировать : Первоначально у меня сложилось впечатление, что функции, переданные функциям map, передаются автоматически, но после прочтения некоторых документов я больше не уверен в этом.

В любомВ этом случае вы можете определить свою широковещательную переменную:

bv = [('a', 5), ('b', 12), ('c', 42), ('d', 29)]

# turn into a dictionary
bv = dict(bv)
broadcastVar = sc.broadcast(bv)
print(broadcastVar.value)
#{'a': 5, 'c': 42, 'b': 12, 'd': 29}

Теперь она доступна на всех машинах как переменная только для чтения .Вы можете получить доступ к словарю, используя broascastVar.value:

Например:

import json

rdd = sc.parallelize(
    [
        '{"id": 1, "title": "Foo", "items": ["a","b","c"]}',
        '{"id": 2, "title": "Bar", "items": ["a","b","d"]}'
    ]
)

def myMapper(row):
    # define the order of the values for your output
    key_order = ["id", "title", "items"]

    # load the json string into a dict
    d = json.loads(row)

    # replace the items using the broadcast variable dict
    d["items"] = [broadcastVar.value.get(item) for item in d["items"]]

    # return the values in order
    return tuple(d[k] for k in key_order)

print(rdd.map(myMapper).collect())
#[(1, u'Foo', [5, 12, 42]), (2, u'Bar', [5, 12, 29])]
...