записать спарк-фрейм как массив json (pyspark) - PullRequest
5 голосов
/ 04 октября 2019

Я хотел бы написать свой искровой фрейм данных в виде набора файлов JSON и, в частности, каждый из них в виде массива JSON. Позвольте мне объяснить с помощью простого (воспроизводимого) кода.

У нас есть:

import numpy as np
import pandas as pd
df = spark.createDataFrame(pd.DataFrame({'x': np.random.rand(100), 'y': np.random.rand(100)}))

Сохранение кадра данных как:

df.write.json('s3://path/to/json')

В каждом только что созданном файле по одному объекту JSON на строку, что-то вроде:

{"x":0.9953802385540144,"y":0.476027611419198}
{"x":0.929599290575914,"y":0.72878523939521}
{"x":0.951701684432855,"y":0.8008064729546504}

но мне хотелось бы иметь массив этих JSON для файла :

[
   {"x":0.9953802385540144,"y":0.476027611419198},
   {"x":0.929599290575914,"y":0.72878523939521},
   {"x":0.951701684432855,"y":0.8008064729546504}
]

1 Ответ

2 голосов
/ 04 октября 2019

В настоящее время невозможно, чтобы спарк "изначально" записывал один файл в нужном вам формате, потому что спарк работает распределенным (параллельным) способом, при этом каждый исполнитель записывает свою часть данных независимо.

Однако, поскольку вы согласны с тем, чтобы каждый файл представлял собой массив json, а не только [один] файл , вот один из способов, который вы можете использовать для достижения желаемого результата:

from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

df.select(to_json(struct(*df.columns)).alias("json"))\
    .groupBy(spark_partition_id())\
    .agg(collect_list("json").alias("json_list"))\
    .select(col("json_list").cast("string"))\
    .write.text("s3://path/to/json")

Сначала вы создаете json из всех столбцов в df. Затем сгруппируйте по идентификатору искрового раздела и агрегируйте, используя collect_list. Это поместит все json в этом разделе в список. Поскольку вы агрегируете данные внутри раздела, перестановка данных не требуется.

Теперь выберите столбец списка, преобразуйте его в строку и запишите его в виде текстового файла.

Вотпример того, как выглядит один файл:

[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]

Обратите внимание, что вы можете получить несколько пустых файлов.


Предположительно, вы можете заставить spark записывать данные в ОДИН файл, если вы указалипусто groupBy, но это приведет к принудительному объединению всех данных в один раздел, что может привести к ошибке нехватки памяти.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...