PySpark: как создать структуру json? - PullRequest
0 голосов
/ 06 декабря 2018

Я пытаюсь создать JSON из приведенной ниже структуры.

Пример данных:

Country|SegmentID|total_cnt|max_value|
+---------+---------+---------+---------+
|     Pune|        1|     10.0|       15|
|    Delhi|        1|     10.0|       15|
|Bangalore|        1|     10.0|       15|
|     Pune|        2|     10.0|       16|
|    Delhi|        2|     10.0|       16|
|Bangalore|        2|     10.0|       16|
|     Pune|        3|     15.0|       16|
|    Delhi|        3|     10.0|       16|
|Bangalore|        3|     15.0|       16|
+---------+---------+---------+---------+

Вот мой код:

enter image description here

Ожидаемая структура JSON:

[{
        "NAME": "SEG1",
        "VAL": 15,
        "CITIES": {
            "Bangalore": 10,
            "Delhi": 10,
            "Pune": 10
        }
    },
    {
        "NAME": "SEG2",
        "VAL": 16,
        "CITIES": {
            "Bangalore": 10,
            "Delhi": 10,
            "Pune": 10
        }
    },
    {
        "NAME": "SEG3",
        "VAL": 16,
        "CITIES": {
            "Bangalore": 15,
            "Delhi": 10,
            "Pune": 15
        }
    }
]

Я могу создать одноуровневую иерархию, но это также не удовлетворяет моим требованиям.

join_df=join_df.toPandas()
j = (join_df.groupby(['SegmentID','max_value'], as_index=False)
                .apply(lambda x: x[['Country','total_cnt']].to_dict('r'))
                .reset_index().rename(columns={0:'CITIES'})
                .to_json(orient='records'))

Это дает такой результат:

[{"SegmentID":1,"max_value":15,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":2,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":10.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":10.0}]},{"SegmentID":3,"max_value":16,"Cities":[{"Country":"Pune","total_cnt":15.0},{"Country":"Delhi","total_cnt":10.0},{"Country":"Bangalore","total_cnt":15.0}]}]

1 Ответ

0 голосов
/ 06 декабря 2018

Вы можете преобразовать Dataframe в RDD и применить ваши преобразования:

from pyspark.sql.types import *
import json

NewSchema = StructType([StructField("Name", StringType())
                           , StructField("VAL", IntegerType())
                           , StructField("CITIES", StringType())
                        ])

def reduceKeys(row1, row2):
        row1[0].update(row2[0])
        return row1

res_df = join_df.rdd.map(lambda row: ("SEG" + str(row[1]), ({row[0]: row[2]}, row[3])))\
    .reduceByKey(lambda x, y: reduceKeys(x, y))\
    .map(lambda row: (row[0], row[1][1], json.dumps(row[1][0])))\
    .toDF(NewSchema)

Вот результат:

res_df.show(20, False)


+----+---+------------------------------------------------+
|Name|VAL|CITIES                                          |
+----+---+------------------------------------------------+
|SEG1|15 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
|SEG3|16 |{"Pune": 15.0, "Delhi": 10.0, "Bangalore": 15.0}|
|SEG2|16 |{"Pune": 10.0, "Delhi": 10.0, "Bangalore": 10.0}|
+----+---+------------------------------------------------+

Теперь вы можете сохранить его в файле JSON:

res_df.coalesce(1).write.format('json').save('output.json')
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...