Pyspark считал JSON как диктовку или структуру, а не как фрейм данных / RDD - PullRequest
0 голосов
/ 29 января 2020

У меня есть файл JSON, сохраненный в S3, который я пытаюсь открыть / прочитать / сохранить / как угодно в формате dict или struct в PySpark. Это выглядит примерно так:

{
    "filename": "some_file.csv",
    "md5": "md5 hash",
    "client_id": "some uuid",
    "mappings": {
        "shipping_city": "City",
        "shipping_country": "Country",
        "shipping_zipcode": "Zip",
        "shipping_address1": "Street Line 1",
        "shipping_address2": "Street Line 2",
        "shipping_state_abbreviation": "State"
    }
}

И я хотел бы прочитать его из S3 и сохранить как словарь или структуру. Когда я читаю это так:

inputJSON = "s3://bucket/file.json"
dfJSON = sqlContext.read.json(inputJSON, multiLine=True)

я получаю кадр данных, который отбрасывает сопоставления и выглядит так:

+---------+-------------+----------------------------------------------------------+-------+
|client_id|filename     |mappings                                                  |md5    |
+-----------------------+----------------------------------------------------------+-------+
|some uuid|some_file.csv|[City, Country, Zip, Street Line 1, Street Line 2, State] |md5hash|
+-----------------------+----------------------------------------------------------+-------+

Можно ли открыть файл и прочитать его в словарь, чтобы я мог получить доступ к сопоставлениям или другим подобным вещам ?:

jsonDict = inputFile
mappingDict = jsonDict['mappings']

Ответы [ 2 ]

0 голосов
/ 29 января 2020

Я смог решить эту проблему, добавив boto3 в кластер EMR и используя следующий код:

import boto3
import json

s3 = boto3.resource('s3')
obj = s3.Object('slm-transaction-incoming','All_Starbucks_Locations_in_the_US.json')
string = obj.get()['Body'].read().decode('utf-8')

json = json.loads(string)

Чтобы добавить boto3, введите в терминал EMR следующее:

sudo pip-3.6 install boto3
0 голосов
/ 29 января 2020

Вы можете попробовать что-то вроде этого:

inputJSON = "/tmp/some_file.json"
dfJSON = spark.read.json(inputJSON, multiLine=True)

dfJSON.printSchema()


root
 |-- client_id: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- mappings: struct (nullable = true)
 |    |-- shipping_address1: string (nullable = true)
 |    |-- shipping_address2: string (nullable = true)
 |    |-- shipping_city: string (nullable = true)
 |    |-- shipping_country: string (nullable = true)
 |    |-- shipping_state_abbreviation: string (nullable = true)
 |    |-- shipping_zipcode: string (nullable = true)
 |-- md5: string (nullable = true)


dict_mappings = dfJSON.select("mappings").toPandas().set_index('mappings').T.to_dict('list')

dict_mappings

{Row(shipping_address1='Street Line 1', shipping_address2='Street Line 2', shipping_city='City', shipping_country='Country', shipping_state_abbreviation='State', shipping_zipcode='Zip'): []}

ИЛИ (без Pandas)

list_map = map(lambda row: row.asDict(), dfJSON.select("mappings").collect())
dict_mappings2 = {t['mappings']: t for t in list_map}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...