Я думаю, что ваша попытка и общая идея в правильном направлении. Вот еще два подхода, основанные на встроенных параметрах, называемых get_json_object
/ from_json
через API-интерфейс DataFrame, и использовании преобразования map
вместе с json.dumps()
и json.loads()
в Python через RDD API.
Вариант 1: get_json_object () / from_json ()
Сначала давайте попробуем get_json_object()
, который не требует схемы:
import pyspark.sql.functions as f
df = spark.createDataFrame([
('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())
df.select(f.get_json_object("value", "$.id").alias("id"), \
f.get_json_object("value", "$.type").alias("type"), \
f.get_json_object("value", "$.data").alias("data"))
# +---+----+-----------------------------+
# |id |type|data |
# +---+----+-----------------------------+
# |1 |foo |{"key0":"foo","key2":"meh"} |
# |2 |bar |{"key2":"poo","key3":"pants"}|
# |3 |baz |{"key3":"moo"} |
# +---+----+-----------------------------+
Напротив from_json()
требуется определение схемы:
from pyspark.sql.types import StringType, StructType, StructField
import pyspark.sql.functions as f
df = spark.createDataFrame([
('{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}'),
('{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}'),
('{"id": 3, "type": "baz", "data": {"key3": "moo"}}')
], StringType())
schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("data", StringType(), True)
])
df.select(f.from_json("value", schema).getItem("id").alias("id"), \
f.from_json("value", schema).getItem("type").alias("type"), \
f.from_json("value", schema).getItem("data").alias("data"))
# +---+----+-----------------------------+
# |id |type|data |
# +---+----+-----------------------------+
# |1 |foo |{"key0":"foo","key2":"meh"} |
# |2 |bar |{"key2":"poo","key3":"pants"}|
# |3 |baz |{"key3":"moo"} |
# +---+----+-----------------------------+
Опция 2: API map / RDD + json.dumps ()
from pyspark.sql.types import StringType, StructType, StructField
import json
df = spark.createDataFrame([
'{"id": 1, "type": "foo", "data": {"key0": "foo", "key2": "meh"}}',
'{"id": 2, "type": "bar", "data": {"key2": "poo", "key3": "pants"}}',
'{"id": 3, "type": "baz", "data": {"key3": "moo"}}'
], StringType())
def from_json(data):
row = json.loads(data[0])
return (row['id'], row['type'], json.dumps(row['data']))
json_rdd = df.rdd.map(from_json)
schema = StructType([
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("data", StringType(), True)
])
spark.createDataFrame(json_rdd, schema).show(10, False)
# +---+----+--------------------------------+
# |id |type|data |
# +---+----+--------------------------------+
# |1 |foo |{"key2": "meh", "key0": "foo"} |
# |2 |bar |{"key2": "poo", "key3": "pants"}|
# |3 |baz |{"key3": "moo"} |
# +---+----+--------------------------------+
Функция from_json
преобразует строку строки в кортеж (id, type, data)
. json.loads () проанализирует строку json и вернет словарь, через который мы создадим и вернем финальный кортеж.