В pyspark как конвертировать rdd в json по другой схеме? - PullRequest
0 голосов
/ 04 декабря 2018

Как преобразовать приведенный ниже код для записи вывода json с помощью pyspark DataFrame, используя, df2.write.format('json')

  1. У меня есть список ввода (для примера только несколько элементов).
  2. Хотите написать json, который является более сложным / вложенным, чем ввод.
  3. Я попытался использовать rdd.map
  4. Проблема: вывод содержит апострофы для каждого объекта в json.
  5. Я не могу просто заменить строку, потому что сами данные могут ее содержать.
  6. Если есть лучший способ преобразовать схему во вложенный json с DataFrame, то в приведенном ниже примере вы можете показать, как?так как это может полностью разрешить апостроф.

Вот что я попробовал:

import json 

rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
df = rdd.toDF(["a","b","c"])
rddToJson = df.rdd.map(lambda x: json.dumps({"some_top_level_1": {"mycolumn1": x.a}})) // note that result json is complex and more nested than input
rddToJson.collect()

результат: содержит апострофы (не может заменить, он может появляться где угодно в значениях), как это сделатьэто с правильной схемой и датафреймом, а затем df.json.write?

result:

Out[20]: 
['{"some_top_level_1": {"mycolumn1": 1}}',
 '{"some_top_level_1": {"mycolumn1": 4}}',
 '{"some_top_level_1": {"mycolumn1": 7}}']

Моя цель (если это не может быть сделано другим способом) - использовать df.write.format('json') для того, чтобы написать вложенный / сложный json из вышеприведенного ввода.

PS: я увидел этот интересный пост: https://medium.com/@mrpowers/adding-structtype-columns-to-spark-dataframes-b44125409803, но, поскольку я новичок, я не был уверенкак я могу преобразовать входные данные в ту вложенную схему, которая мне нужна на выходе.

1 Ответ

0 голосов
/ 04 декабря 2018

Вы можете использовать функцию struct для создания вложенного кадра данных из плоской схемы.

import json 

rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
df = rdd.toDF(["a","b","c"])

df2 = df.withColumn("some_top_level_1", struct(col("a").alias("my_column1"))).select("some_top_level_1")
df2.coalesce(1).write.mode("overwrite").json("test.json")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...