Как преобразовать файл .CSV в файл .Json с помощью Pyspark? - PullRequest
0 голосов
/ 08 декабря 2018

У меня возникла проблема при преобразовании файла .csv в многострочный файл json с помощью pyspark.

У меня есть файл csv, прочитанный через spark rdd, и мне нужно преобразовать его в многострочный json с помощью pyspark.

Вот мой код:

import json

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("jsonconversion").getOrCreate()

df = spark.read.format("csv").option("header","True").load(csv_file)
df.show()
df_json = df.toJSON()

for row in df_json.collect():

line = json.loads(row)

result =[]



for key,value in list(line.items()):

    if key == 'FieldName':

        FieldName =line['FieldName']

        del line['FieldName']

        result.append({FieldName:line})

        res =result

        with open("D:/tasklist/jsaonoutput.json",'a+')as f:

            f.write(json.dumps(res, indent=4, separators=(',',':')))

Мне нужен вывод в следующем формате.

{
"Name":{
"DataType":"String",
"Length":4,
"Required":"Y",
"Output":"Y",
"Address": "N",
"Phone Number":"N",
"DoorNumber":"N/A"
"Street":"N",
"Locality":"N/A",
"State":"N/A"
  }
  }

Мой входной CSV-файл выглядит так:

enter image description here

Я новичок в Pyspark. Будем весьма благодарны за любые пожелания изменить этот код в рабочий код.

Заранее спасибо.

1 Ответ

0 голосов
/ 08 декабря 2018

Попробуйте следующий код.Сначала он создает фрейм данных pandas из spark DF (если вы не хотите делать что-то еще со spark df, вы можете загрузить CSV-файл непосредственно в pandas).Из pandas df он создает группы на основе столбца FieldName и затем записывает в файл, где json.dumps заботится о форматировании.

import json
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("jsonconversion").getOrCreate()
df = spark.read.format("csv").option("header","True").load(csv_file)
df.show()

df_pandas_grped = df.toPandas().groupby('FieldName')
final_dict = {}
for key, grp in df_pandas_grped:
    final_dict[str(key)] = grp.to_dict('records') 

with open("D:/tasklist/jsaonoutput.json",'w')as f:
        f.write(json.dumps(final_dict,indent=4))
...