Создать вложенный JSON из PySpark Dataframe - PullRequest
0 голосов
/ 20 сентября 2019

Этот плоский json для json вложен в pyspark.

{
    'event_type': 'click', 
    'id': '223',
    'person_id': 201031940, 
    'category': 'Chronicles', 
    'approved_content': 1
}

to

{
    'event_type': 'click', 
    user: {
        'id': '223',
        'person_id': 201031940
    },
    event: {
        'category': 'Chronicles', 
        'approved_content': 1
    }
}

Ответы [ 2 ]

1 голос
/ 21 сентября 2019

Вы также можете сделать это без использования udfs, что является более эффективным и значительно меняет ситуацию, если вы работаете с большим количеством записей:

import pyspark.sql.fuctions as f
events_schema = StructType([
    StructField('event_type', StringType(), True),
    StructField('id', StringType(), True),
    StructField('person_id', StringType(), True),
    StructField('category', StringType(), True),
    StructField('approved_content', StringType(), True),
])

events = [{
    'event_type': 'click',
    'id': '223',
    'person_id': 201031940,
    'category': 'Chronicles',
    'approved_content': 1
}]
df = spark.createDataFrame(events, schema=events_schema)
newDf = (df
          .withColumn('user', f.struct(df.id, df.person_id))
          .withColumn('event', f.struct(df.category, df.approved_content))
          .withColumn('nestedEvent', f.struct(f.col('user'), f.col('event')))
          .select('nestedEvent'))
1 голос
/ 21 сентября 2019

Вот что вы можете сделать:

  1. Определить схему и преобразовать плоский json в массив данных, используя схему.
  2. Зарегистрировать пару пользовательских функций для построения карты пользователя и события.
  3. Добавление новых столбцов (пользователь и событие) в кадре данных с использованием регистра UDF в # 2
  4. Удаление лишних столбцов

Вот полный код:

from pyspark.sql.types import (
    StringType,
    StructField,
    StructType,
    MapType
)
from pyspark.sql.functions import udf

events_schema = StructType([
    StructField('event_type', StringType(), True),
    StructField('id', StringType(), True),
    StructField('person_id', StringType(), True),
    StructField('category', StringType(), True),
    StructField('approved_content', StringType(), True),
])

events = [{
    'event_type': 'click',
    'id': '223',
    'person_id': 201031940,
    'category': 'Chronicles',
    'approved_content': 1
}]
df = spark.createDataFrame(events, schema=events_schema)

build_user_udf = udf(lambda id, person_id: {
    'id': id,
    'person_id': person_id
}, MapType(StringType(), StringType()))

build_event_udf = udf(lambda category, approved_content: {
    'category': category,
    'approved_content': approved_content
}, MapType(StringType(), StringType()))

nested_event_df = (
    df
    .withColumn('user', build_user_udf(df['id'], df['person_id']))
    .withColumn('event', build_event_udf(df['category'], df['approved_content']))
    .drop('id')
    .drop('person_id')
    .drop('category')
    .drop('approved_content')
)

nested_event_df.toJSON (). First ()

'{"event_type": "click", "user": {"id": "223", "person_id":"201031940"}, "event": {"mitted_content ":" 1 "," category ":" Chronicles "}} '

nested_event_df.take (1)

[Row (event_type)= 'click', user = {'id': '223', 'person_id': '201031940'}, событие = {'mitted_content ':' 1 ',' category ':' Chronicles '})]

Это довольно простая версия, но вы можете оптимизировать ее, если хотите.

...