В настоящее время невозможно, чтобы спарк "изначально" записывал один файл в нужном вам формате, потому что спарк работает распределенным (параллельным) способом, при этом каждый исполнитель записывает свою часть данных независимо.
Однако, поскольку вы согласны с тем, чтобы каждый файл представлял собой массив json, а не только [один] файл , вот один из способов, который вы можете использовать для достижения желаемого результата:
from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct
df.select(to_json(struct(*df.columns)).alias("json"))\
.groupBy(spark_partition_id())\
.agg(collect_list("json").alias("json_list"))\
.select(col("json_list").cast("string"))\
.write.text("s3://path/to/json")
Сначала вы создаете json
из всех столбцов в df
. Затем сгруппируйте по идентификатору искрового раздела и агрегируйте, используя collect_list
. Это поместит все json
в этом разделе в список. Поскольку вы агрегируете данные внутри раздела, перестановка данных не требуется.
Теперь выберите столбец списка, преобразуйте его в строку и запишите его в виде текстового файла.
Вотпример того, как выглядит один файл:
[{"x":0.1420523746714616,"y":0.30876114874052263}, ... ]
Обратите внимание, что вы можете получить несколько пустых файлов.
Предположительно, вы можете заставить spark записывать данные в ОДИН файл, если вы указалипусто groupBy
, но это приведет к принудительному объединению всех данных в один раздел, что может привести к ошибке нехватки памяти.