Обновляя свой ответ, я использовал udf
, чтобы поместить ключ в массив, а затем взорваться, чтобы достичь желаемого результата
См. Пример ниже:
import json
import re
import pyspark.sql.functions as f
from pyspark.shell import spark
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType
df = spark.createDataFrame([
('test:1',
'{"test1":[{"Id":"17","cName":"c1"},{"Id":"01","cName":"c2","pScore":0.003609}]},'
'{"test8":[{"Id":"1","cName":"c11","pScore":0.0},{"Id":"012","cName":"c2","pScore":0.003609}]}'),
('test:2', '{"test1:subtest2":[{"Id":"18","cName":"c13","pScore":0.00203}]}')
], ['col1', 'col2'])
schema = ArrayType(
StructType(
[
StructField("Col", StringType()),
StructField("Id", StringType()),
StructField("cName", StringType()),
StructField("pScore", DoubleType())
]
)
)
@f.udf(returnType=schema)
def parse_col(column):
updated_values = []
for it in re.finditer(r'{.*?}]}', column):
parse = json.loads(it.group())
for key, values in parse.items():
for value in values:
value['Col'] = key
updated_values.append(value)
return updated_values
df = df \
.withColumn('tmp', parse_col(f.col('col2'))) \
.withColumn('tmp', f.explode(f.col('tmp'))) \
.select(f.col('col1'),
f.col('tmp').Col.alias('col2'),
f.col('tmp').Id.alias('Id'),
f.col('tmp').cName.alias('cName'),
f.col('tmp').pScore.alias('pScore'))
df.show()
Вывод:
+------+--------------+---+-----+--------+
| col1| col2| Id|cName| pScore|
+------+--------------+---+-----+--------+
|test:1| test1| 17| c1| null|
|test:1| test1| 01| c2|0.003609|
|test:1| test8| 1| c11| 0.0|
|test:1| test8|012| c2|0.003609|
|test:2|test1:subtest2| 18| c13| 0.00203|
+------+--------------+---+-----+--------+