PySpark: Как создать вложенный JSON из фрейма данных spark? - PullRequest
0 голосов
/ 26 ноября 2018

Я пытаюсь создать вложенный JSON из моего фрейма данных искры, который имеет данные в следующей структуре.Приведенный ниже код создает простой JSON с ключом и значением.Не могли бы вы помочь

df.coalesce(1).write.format('json').save(data_output_file+"createjson.json", overwrite=True)

Обновление 1: Согласно ответу @MaxU, я преобразовал фрейм искровых данных в pandas и использовал group by.Это помещает последние два поля во вложенный массив.Как я мог сначала поместить категорию и считать во вложенный массив, а затем внутри этого массива я хочу поместить подкатегорию и счет.

Пример текстовых данных:

Vendor_Name,count,Categories,Category_Count,Subcategory,Subcategory_Count
Vendor1,10,Category 1,4,Sub Category 1,1
Vendor1,10,Category 1,4,Sub Category 2,2
Vendor1,10,Category 1,4,Sub Category 3,3
Vendor1,10,Category 1,4,Sub Category 4,4

j = (data_pd.groupby(['vendor_name','vendor_Cnt','Category','Category_cnt'], as_index=False)
             .apply(lambda x: x[['Subcategory','subcategory_cnt']].to_dict('r'))
             .reset_index()
             .rename(columns={0:'subcategories'})
             .to_json(orient='records'))

enter image description here

[{
        "vendor_name": "Vendor 1",
        "count": 10,
        "categories": [{
            "name": "Category 1",
            "count": 4,
            "subCategories": [{
                    "name": "Sub Category 1",
                    "count": 1
                },
                {
                    "name": "Sub Category 2",
                    "count": 1
                },
                {
                    "name": "Sub Category 3",
                    "count": 1
                },
                {
                    "name": "Sub Category 4",
                    "count": 1
                }
            ]
        }]

Ответы [ 2 ]

0 голосов
/ 28 ноября 2018

Самый простой способ сделать это в python / pandas - использовать ряд вложенных генераторов, используя groupby Я думаю:

def split_df(df):
    for (vendor, count), df_vendor in df.groupby(["Vendor_Name", "count"]):
        yield {
            "vendor_name": vendor,
            "count": count,
            "categories": list(split_category(df_vendor))
        }

def split_category(df_vendor):
    for (category, count), df_category in df_vendor.groupby(
        ["Categories", "Category_Count"]
    ):
        yield {
            "name": category,
            "count": count,
            "subCategories": list(split_subcategory(df_category)),
        }

def split_subcategory(df_category):
    for row in df.itertuples():
        yield {"name": row.Subcategory, "count": row.Subcategory_Count}

list(split_df(df))
[
    {
        "vendor_name": "Vendor1",
        "count": 10,
        "categories": [
            {
                "name": "Category 1",
                "count": 4,
                "subCategories": [
                    {"name": "Sub Category 1", "count": 1},
                    {"name": "Sub Category 2", "count": 2},
                    {"name": "Sub Category 3", "count": 3},
                    {"name": "Sub Category 4", "count": 4},
                ],
            }
        ],
    }
]

Чтобы экспортировать этона json вам понадобится способ экспортировать np.int64

0 голосов
/ 26 ноября 2018

Для этого вам нужно реструктурировать весь фрейм данных.

«подкатегории» - это структурный стиль.

from pyspark.sql import functions as F
df.withColumn(
  "subCategories",
  F.struct(
    F.col("subCategories").alias("name"),
    F.col("subcategory_count").alias("count")
  )
)

, затем groupBy и используйте F.collect_list для создания массива.

В конце вам потребуется только 1 запись в вашем фрейме данных, чтобы получить ожидаемый результат.

...