pyspark rdd объединяет несколько файлов json в один rdd - PullRequest
0 голосов
/ 07 ноября 2019

Я пытаюсь объединить данные двух файлов в один rdd. Допустим, у меня есть два файла file1.txt, который является большим файлом в формате json, file2.txt, который является небольшим файлом в формате CSV.

file1.txt (формат json) выглядит так:

{

{"a":1 , "b":{"ba":1 , "bb":"babc", "bc":"babc2", "d":"babc3"}, "c":"abc2"}, 

{"a":2 , "b":{"ba":2 , "bb":"babc2", "bc":"babc22", "d":"babc32"}, "c":"abc22"}

}

file2.txt (формат csv) выглядит следующим образом:

key   value

xyz   xyz1

pqr   pqr1

Теперь я хочу, чтобы мой вывод был rdd следующим образом:

{

{"a":1 , "b":{"ba":1 , "bb":"babc", "bc":"babc2", "d":"babc3", "e":{{"key": "xyz", "value":"xyz1"},{"key": "pqr", "value":"pqr1"}}, "c":"abc2"}, 

{"a":2 , "b":{"ba":2 , "bb":"babc2", "bc":"babc22", "d":"babc32", "e":{{"key": "xyz", "value":"xyz1"},{"key": "pqr", "value":"pqr1"}}, "c":"abc22"}

}

Я попытался преобразовать file2.txt в формате json, а затем выполните операцию, например:

output_rdd = file1_rdd.map(lambda x: joinfunc(x, file2_rdd))

А затем попробуйте написать output_rdd.

Но это дает мне такие ошибки, как

cPickle.PicklingError: Не удалось сериализовать объект: Py4JError: Произошла ошибка при вызове o72. getnewargs . Трассировка: py4j.Py4JException: Method getnewargs ([]) не существует

Любое предложение объединить эти два файла в один выходной RDD? Любые предложения будут полезны.

PS: я совершенно новичок в искре.

Редактировать:

мой joinfunc выглядит так:

def joinfunc(file1_json, file2_json):

    file1_detail = json.loads(file1_json)
    file2_detail = json.loads(file2_json)

    b_file1_detail_list = file1_detail["b"]
    append_detail =[]

    if b_file1_detail_list is not None:
        for b_file1_detail in b_file1_detail_list:
            append_detail = {"e" : file2_detail}
            b_file1_detail.append(append_detail)

    return json.dumps(dict(file1_detail))
...