Как написать один файл Json для каждой строки из кадра данных в Scala / Spark и переименовать файлы - PullRequest
0 голосов
/ 08 февраля 2019

Необходимо создать один файл json для каждой строки в кадре данных.Я использую PartitionBy, который создает подпапки для каждого файла.Есть ли способ избежать создания подпапок и переименовать файлы JSON с помощью уникального ключа?ИЛИ какие-нибудь другие альтернативы?Это огромный массив данных с тысячами (~ 300 КБ) уникальных значений, поэтому Repartition поглощает много ресурсов и занимает много времени. Спасибо.

df.select(Seq(col("UniqueField").as("UniqueField_Copy")) ++ 
df.columns.map(col): _*)       
.write.partitionBy("UniqueField")
.mode("overwrite").format("json").save("c:\temp\json\")

1 Ответ

0 голосов
/ 12 февраля 2019

Размещение всех выходных данных в одном каталоге

Ваш пример кода вызывает partitionBy для DataFrameWriter объекта.Документация говорит нам, что эта функция:

Распределяет выходные данные по заданным столбцам в файловой системе.Если указано, выходные данные размещаются в файловой системе аналогично схеме разбиения Hive.Например, когда мы разбиваем набор данных на год и месяц, структура каталога будет выглядеть следующим образом:

year = 2016 / month = 01 /

year = 2016 / month = 02 /

Вот почему вы получаете подкаталоги.Просто удалив вызов partitionBy, вы получите все выходные данные в одном каталоге.

Получение одной строки на файл

Spark SQL

У вас была правильная идея разбить ваши данныена UniqueField, поскольку Spark записывает один файл на раздел.Вместо использования раздела DataFrameWriter вы можете использовать

df.repartitionByRange(numberOfJson, $"UniqueField")

, чтобы получить нужное количество разделов, с одним JSON на раздел.Обратите внимание, что для этого необходимо заранее знать количество JSON, которое вы получите.Вы можете вычислить его с помощью

val numberOfJson = df.select(count($"UniqueField")).first.getAs[Long](0)

Однако это добавит к вашему запросу дополнительное действие, которое приведет к повторному вычислению набора данных весь .Похоже, ваш набор данных слишком велик, чтобы поместиться в памяти, поэтому вам нужно тщательно продумать, действительно ли кэширование (или контрольная точка) с помощью df.cache (или df.checkpoint) действительно экономит ваше время вычислений.(Для больших наборов данных, для создания которых не требуются интенсивные вычисления, повторное вычисление может быть на самом деле быстрее)

RDD

Альтернативой использованию Spark SQL API является переход на более низкий уровеньRDD.Разделение по ключу (в pyspark) для СДР подробно обсуждалось в ответе на на этот вопрос .В scala вам нужно указать пользовательский Partitioner, как описано в этом вопросе .

Переименование выходных файлов Spark

Это довольно распространенный вопрос, и AFAIK, согласие в том, что это невозможно.

Надеюсь, это поможет, и добро пожаловать в Stack Overflow!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...