Pyspark конвертирует массив json в строки данных - PullRequest
0 голосов
/ 23 января 2020

новичок pyspark здесь - у меня есть фрейм данных spark, где каждая строка - это URL на s3. каждый URL является файлом GZIP массива JSON, я могу проанализировать каждую строку (ссылку) в кадре данных в список python, но я не знаю, как создать несколько строк из этого списка JSON.

это функция, которую я использовал, которая возвращает список jsons:

def distributed_read_file(url):
    s3_client = boto3.client('s3')
    result = s3_client.get_object(Bucket=raw_data_bucket_name, Key=url)
    bytestream = BytesIO(result['Body'].read())
    string_json = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
    list_of_jsons = json.loads(string_json) 

Если, например, это JSON объекты из списка:

[{"a": 99, "b": 102}, {"a": 43, "b": 87}]

Я хочу запустить функцию на фрейме данных URLS, например:

result_df = urls_rdd.map(distributed_read_file)

и получить фрейм данных со столбцами: a и b (JSON ключи). когда я пытался это сделать, я возвращал каждый json объект в виде столбца MapType, и мне было трудно с этим работать.

Большое спасибо, надеюсь, это было понятно!

1 Ответ

0 голосов
/ 26 января 2020

Так что, если это кому-то поможет, я нашел действительно простое решение:

def distributed_read_gzip(url):
    s3_client = boto3.client('s3')
    result = s3_client.get_object(Bucket=raw_data_bucket_name, Key=url)
    bytestream = BytesIO(result['Body'].read())
    string_json = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
    for json_obj in json.loads(string_json):
        yield Row(**json_obj)

, в то время как вызов функции выполняется с плоской картой, потому что для каждого URL возвращается несколько строк:

new_rdd = urls_rdd.flatMap(distributed_read_gzip)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...