Разобрать произвольный JSON с помощью Spark's from_json - PullRequest
0 голосов
/ 14 мая 2018

У меня есть набор данных, который выглядит следующим образом:

~ ❯ head example.csv
ix,value
1,{"abc": {"name": "bob", "profession": "engineer"}}
2,{"def": {"name": "sarah", "profession": "scientist"}, "ghi": {"name": "matt", "profession": "doctor"}}

Столбец value содержит BLOB-объекты JSON.Как вы можете видеть, каждый BLOB-объект JSON имеет форму {A: B}, где A - случайная / произвольная строка, а B - относительно правильно сформированный объект JSON.

Результат, который я хочу получитьотсюда:

ix,names,professions
1,[bob],[engineer]
2,[sarah,matt],[scientist,doctor]

Чтобы потом взорваться в это:

ix,name,profession
1,bob,engineer
2,sarah,scientist
2,matt,doctor

Поскольку я не знаю возможных ключей A, у меня возникают трудности с анализом BLOB-объекта JSON вStructType (я не могу перечислить все возможные ключи) или MapType (не поддерживается from_json):

>>> rdd.withColumn('parsed', F.from_json(F.col('value'), MapType(StringType(), MapType(StringType(), StringType(), False), False)))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/gberger/Projects/spark/python/pyspark/sql/dataframe.py", line 1800, in withColumn
    return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
  File "/Users/gberger/Projects/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/Users/gberger/Projects/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'jsontostructs(`value`)' due to data type mismatch: Input schema map<string,map<string,string>> must be a struct or an array of structs.;;\n'Project [id#35, value#36, jsontostructs(MapType(StringType,MapType(StringType,StringType,false),false), value#36, Some(Europe/London)) AS parsed#46]\n+- Relation[id#35,value#36] csv\n"

Я знаю, что могу использовать UDF, но это сильно повлияет на производительность;Я хотел бы по возможности придерживаться нативных функций Spark.

1 Ответ

0 голосов
/ 03 июля 2018

Вы можете использовать что-то вроде:

сначала определите свою схему:

jsonSchema = StructType([ StructField("name", StringType(), True),
                          StructField("profession", StringType(), True)
                        ])

df = df.withColumn("value", from_json(df["value"], jsonSchema))

выберите свойства json и сформируйте фрейм данных

df = df.select("value.name", "value.profession")

Надеюсь, что это отвечает на ваш запрос.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...