Разобрать вложенную структуру JSON с помощью схемы изменений с помощью Spark DataFrame или RDD API - PullRequest
0 голосов
/ 08 апреля 2019

У меня много jsons с такой структурой

{
    "parent_id": "parent_id1",
    "devices" : "HERE_IS_STRUCT_SERIALIZED_AS_STRING_SEE BELOW"
}

{
    "0x0034" : { "id": "0x0034", "p1": "p1v1", "p2": "p2v1" },
    "0xAB34" : { "id": "0xAB34", "p1": "p1v2", "p2": "p2v2" },
    "0xCC34" : { "id": "0xCC34", "p1": "p1v3", "p2": "p2v3" },
    "0xFFFF" : { "id": "0xFFFF", "p1": "p1v4", "p2": "p2v4" },
    ....
    "0x0023" : { "id": "0x0023", "p1": "p1vN", "p2": "p2vN" },
}

Как видите, вместо создания массива объектов разработчики телеметрии сериализуют каждый элемент как свойство объекта, также имена свойств меняются в зависимости от идентификатора.

Используя Spark DataFrame или RDD API , я хочу преобразовать его в такую ​​таблицу

parent_id1, 0x0034, p1v1, p2v1
parent_id1, 0xAB34, p1v2, p2v2
parent_id1, 0xCC34, p1v3, p2v3
parent_id1, 0xFFFF, p1v4, p2v4
parent_id1, 0x0023, p1v5, p2v5

Вот пример данных:

{
    "parent_1": "parent_v1",
    "devices" : "{ \"0x0034\" : { \"id\": \"0x0034\", \"p1\": \"p1v1\", \"p2\": \"p2v1\" }, \"0xAB34\" : { \"id\": \"0xAB34\", \"p1\": \"p1v2\", \"p2\": \"p2v2\" }, \"0xCC34\" : { \"id\": \"0xCC34\", \"p1\": \"p1v3\", \"p2\": \"p2v3\" }, \"0xFFFF\" : { \"id\": \"0xFFFF\", \"p1\": \"p1v4\", \"p2\": \"p2v4\" }, \"0x0023\" : { \"id\": \"0x0023\", \"p1\": \"p1vN\", \"p2\": \"p2vN\" }}"
}

{
    "parent_2": "parent_v1",
    "devices" : "{ \"0x0045\" : { \"id\": \"0x0045\", \"p1\": \"p1v1\", \"p2\": \"p2v1\" }, \"0xC5C1\" : { \"id\": \"0xC5C1\", \"p1\": \"p1v2\", \"p2\": \"p2v2\" }}"
}

Желаемый вывод

parent_id1, 0x0034, p1v1, p2v1
parent_id1, 0xAB34, p1v2, p2v2
parent_id1, 0xCC34, p1v3, p2v3
parent_id1, 0xFFFF, p1v4, p2v4
parent_id1, 0x0023, p1v5, p2v5

parent_id2, 0x0045, p1v1, p2v1
parent_id2, 0xC5C1, p1v2, p2v2

Я думал о передаче устройств в качестве параметра функции from_json, а затем как-то преобразовать возвращаемый объект в массив JSON и затем взорвать его .... Но from_json хочет схему в качестве входных данных, но схема может меняться ...

1 Ответ

2 голосов
/ 10 апреля 2019

Возможно, есть более питонский или искровой способ сделать это, но у меня это сработало:

Входные данные

data = {
    "parent_id": "parent_v1",
    "devices" : "{ \"0x0034\" : { \"id\": \"0x0034\", \"p1\": \"p1v1\", \"p2\": \"p2v1\" }, \"0xAB34\" : { \"id\": \"0xAB34\", \"p1\": \"p1v2\", \"p2\": \"p2v2\" }, \"0xCC34\" : { \"id\": \"0xCC34\", \"p1\": \"p1v3\", \"p2\": \"p2v3\" }, \"0xFFFF\" : { \"id\": \"0xFFFF\", \"p1\": \"p1v4\", \"p2\": \"p2v4\" }, \"0x0023\" : { \"id\": \"0x0023\", \"p1\": \"p1vN\", \"p2\": \"p2vN\" }}"
}

Получить Dataframe

import json

def get_df_from_json(json_data):
    #convert string to json
    json_data['devices'] = json.loads(json_data['devices'])
    list_of_dicts = []
    for device_name, device_details in json_data['devices'].items():
        row = {
          "parent_id": json_data['parent_id'],
          "device": device_name
        }
        for key in device_details.keys():
            row[key] = device_details[key]
        list_of_dicts.append(row)
    return spark.read.json(sc.parallelize(list_of_dicts), multiLine=True)
display(get_df_from_json(data))

Выход

+--------+--------+------+------+-----------+
| device |   id   |  p1  |  p2  | parent_id |
+--------+--------+------+------+-----------+
| 0x0034 | 0x0034 | p1v1 | p2v1 | parent_v1 |
| 0x0023 | 0x0023 | p1vN | p2vN | parent_v1 |
| 0xFFFF | 0xFFFF | p1v4 | p2v4 | parent_v1 |
| 0xCC34 | 0xCC34 | p1v3 | p2v3 | parent_v1 |
| 0xAB34 | 0xAB34 | p1v2 | p2v2 | parent_v1 |
+--------+--------+------+------+-----------+
...