Вот что вы можете сделать:
- Определить схему и преобразовать плоский json в массив данных, используя схему.
- Зарегистрировать пару пользовательских функций для построения карты пользователя и события.
- Добавление новых столбцов (пользователь и событие) в кадре данных с использованием регистра UDF в # 2
- Удаление лишних столбцов
Вот полный код:
from pyspark.sql.types import (
StringType,
StructField,
StructType,
MapType
)
from pyspark.sql.functions import udf
events_schema = StructType([
StructField('event_type', StringType(), True),
StructField('id', StringType(), True),
StructField('person_id', StringType(), True),
StructField('category', StringType(), True),
StructField('approved_content', StringType(), True),
])
events = [{
'event_type': 'click',
'id': '223',
'person_id': 201031940,
'category': 'Chronicles',
'approved_content': 1
}]
df = spark.createDataFrame(events, schema=events_schema)
build_user_udf = udf(lambda id, person_id: {
'id': id,
'person_id': person_id
}, MapType(StringType(), StringType()))
build_event_udf = udf(lambda category, approved_content: {
'category': category,
'approved_content': approved_content
}, MapType(StringType(), StringType()))
nested_event_df = (
df
.withColumn('user', build_user_udf(df['id'], df['person_id']))
.withColumn('event', build_event_udf(df['category'], df['approved_content']))
.drop('id')
.drop('person_id')
.drop('category')
.drop('approved_content')
)
nested_event_df.toJSON (). First ()
'{"event_type": "click", "user": {"id": "223", "person_id":"201031940"}, "event": {"mitted_content ":" 1 "," category ":" Chronicles "}} '
nested_event_df.take (1)
[Row (event_type)= 'click', user = {'id': '223', 'person_id': '201031940'}, событие = {'mitted_content ':' 1 ',' category ':' Chronicles '})]
Это довольно простая версия, но вы можете оптимизировать ее, если хотите.